Skip to content

Commit 9c58bcc

Browse files
authored
[ci] bulk-storage support for acs snapshots (#3423)
1 parent c4fcb55 commit 9c58bcc

File tree

17 files changed

+635
-46
lines changed

17 files changed

+635
-46
lines changed

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/MultiHostValidatorOperatorIntegrationTest.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,11 @@ class MultiHostValidatorOperatorIntegrationTest
196196
splitwellWalletClient.balance().unlockedQty should beWithin(47, 48)
197197
// Alice's wallet is stopped, so we confirm the transaction via scan
198198
sv1ScanBackend
199-
.listTransactions(None, TransactionHistoryRequest.SortOrder.Desc, Limit.MaxPageSize)
199+
.listTransactions(
200+
None,
201+
TransactionHistoryRequest.SortOrder.Desc,
202+
Limit.DefaultMaxPageSize,
203+
)
200204
.flatMap(_.transfer)
201205
.filter(tf => tf.description == transferDescription) should not be empty
202206
},

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanIntegrationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import scala.util.{Success, Try}
4747
// this test sets fees to zero, and that only works from 0.1.14 onwards
4848
@org.lfdecentralizedtrust.splice.util.scalatesttags.SpliceAmulet_0_1_14
4949
class ScanIntegrationTest extends IntegrationTest with WalletTestUtil with TimeTestUtil {
50-
private val defaultPageSize = Limit.MaxPageSize
50+
private val defaultPageSize = Limit.DefaultMaxPageSize
5151
override def environmentDefinition: SpliceEnvironmentDefinition =
5252
EnvironmentDefinition
5353
.simpleTopology1Sv(this.getClass.getSimpleName)

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/SvTimeBasedRewardCouponIntegrationTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class SvTimeBasedRewardCouponIntegrationTest
225225
.listTransactions(
226226
None,
227227
TransactionHistoryRequest.SortOrder.Desc,
228-
Limit.MaxPageSize,
228+
Limit.DefaultMaxPageSize,
229229
)
230230
.flatMap(_.transfer)
231231
.filter(tf =>
@@ -256,7 +256,7 @@ class SvTimeBasedRewardCouponIntegrationTest
256256
sv1WalletClient
257257
.listTransactions(
258258
None,
259-
Limit.MaxPageSize,
259+
Limit.DefaultMaxPageSize,
260260
)
261261
)
262262
.collect {

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletTxLogIntegrationTest.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,7 +1253,7 @@ class WalletTxLogIntegrationTest
12531253
onboardWalletUser(aliceWalletClient, aliceValidatorBackend)
12541254
val bobUserParty = onboardWalletUser(bobWalletClient, bobValidatorBackend)
12551255
val validatorTxLogBefore =
1256-
aliceValidatorWalletClient.listTransactions(None, Limit.MaxPageSize)
1256+
aliceValidatorWalletClient.listTransactions(None, Limit.DefaultMaxPageSize)
12571257

12581258
val (offerCid, _) =
12591259
actAndCheck(
@@ -1284,7 +1284,8 @@ class WalletTxLogIntegrationTest
12841284
)
12851285

12861286
// Only Alice should see notification (note that aliceValidator is shared between tests)
1287-
val validatorTxLogAfter = aliceValidatorWalletClient.listTransactions(None, Limit.MaxPageSize)
1287+
val validatorTxLogAfter =
1288+
aliceValidatorWalletClient.listTransactions(None, Limit.DefaultMaxPageSize)
12881289

12891290
withoutDevNetTopups(validatorTxLogBefore) should be(
12901291
withoutDevNetTopups(validatorTxLogAfter)
@@ -1413,7 +1414,7 @@ class WalletTxLogIntegrationTest
14131414
val aliceUserId = aliceWalletClient.config.ledgerApiUser
14141415
val charlieUserId = charlieWalletClient.config.ledgerApiUser
14151416
val validatorTxLogBefore =
1416-
aliceValidatorWalletClient.listTransactions(None, Limit.MaxPageSize)
1417+
aliceValidatorWalletClient.listTransactions(None, Limit.DefaultMaxPageSize)
14171418

14181419
// Note: using Alice and Charlie because manually creating subscriptions requires both
14191420
// the sender and the receiver to be hosted on the same participant.
@@ -1534,7 +1535,7 @@ class WalletTxLogIntegrationTest
15341535

15351536
// Validator should not see any notification (note that aliceValidator is shared between tests)
15361537
val validatorTxLogAfter =
1537-
aliceValidatorWalletClient.listTransactions(None, Limit.MaxPageSize)
1538+
aliceValidatorWalletClient.listTransactions(None, Limit.DefaultMaxPageSize)
15381539
withoutDevNetTopups(validatorTxLogBefore) should be(
15391540
withoutDevNetTopups(validatorTxLogAfter)
15401541
)
@@ -1569,7 +1570,7 @@ class WalletTxLogIntegrationTest
15691570
// Note: SV1 is reused between tests, ignore TxLog entries created by previous tests
15701571
val previousEventId = withoutDevNetTopups(
15711572
sv1WalletClient
1572-
.listTransactions(None, Limit.MaxPageSize)
1573+
.listTransactions(None, Limit.DefaultMaxPageSize)
15731574
).headOption
15741575
.map(_.eventId)
15751576

@@ -1655,7 +1656,7 @@ class WalletTxLogIntegrationTest
16551656
)
16561657
)
16571658

1658-
aliceWalletClient.listTransactions(None, Limit.MaxPageSize) shouldBe empty
1659+
aliceWalletClient.listTransactions(None, Limit.DefaultMaxPageSize) shouldBe empty
16591660
}
16601661

16611662
}

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletTxLogTestUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ trait WalletTxLogTestUtil extends TestCommon with WalletTestUtil with TimeTestUt
4444
): Unit = {
4545

4646
eventually() {
47-
val actual = wallet.listTransactions(None, pageSize = Limit.MaxPageSize)
47+
val actual = wallet.listTransactions(None, pageSize = Limit.DefaultMaxPageSize)
4848
val withoutIgnored = actual
4949
.takeWhile(e => !previousEventId.contains(e.eventId))
5050
.filterNot(ignore)

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/http/HttpValidatorLicensesHandler.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ package org.lfdecentralizedtrust.splice.http
66
import org.lfdecentralizedtrust.splice.codegen.java.splice.validatorlicense as vl
77
import org.lfdecentralizedtrust.splice.http.v0.definitions
88
import org.lfdecentralizedtrust.splice.http.v0.definitions.ListValidatorLicensesResponse
9-
import org.lfdecentralizedtrust.splice.store.{AppStore, PageLimit, SortOrder}
9+
import org.lfdecentralizedtrust.splice.store.{AppStore, Limit, PageLimit, SortOrder}
1010
import com.digitalasset.canton.logging.NamedLogging
1111
import com.digitalasset.canton.tracing.{Spanning, TraceContext}
1212
import io.opentelemetry.api.trace.Tracer
@@ -29,7 +29,9 @@ trait HttpValidatorLicensesHandler extends Spanning with NamedLogging {
2929
.listContractsPaginated(
3030
vl.ValidatorLicense.COMPANION,
3131
after,
32-
limit.fold(PageLimit.Max)(PageLimit.tryCreate),
32+
limit.fold(PageLimit.Max)(limit =>
33+
PageLimit.tryCreate(limit, Limit.DefaultMaxPageSize)
34+
),
3335
SortOrder.Descending,
3436
)
3537
.map(_.mapResultsInPage(_.contract))

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/Limit.scala

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,17 @@ import io.grpc.Status
1010

1111
sealed trait Limit {
1212
val limit: Int
13+
// TODO (#767): make configurable (it's currently an argument, but not user-configurable e.g. via config values)
14+
val maxPageSize: Int
1315
}
1416

1517
object Limit {
1618

17-
// TODO (#767): make configurable
18-
val MaxPageSize: Int = 1000
19-
val DefaultLimit: Limit = HardLimit.tryCreate(MaxPageSize)
19+
val DefaultMaxPageSize: Int = 1000
20+
val DefaultLimit: Limit = HardLimit.tryCreate(DefaultMaxPageSize)
2021

21-
private[store] def validateLimit(limit: Int): Either[String, Int] = {
22-
if (limit > MaxPageSize) Left(s"Exceeded limit maximum ($limit > $MaxPageSize).")
22+
private[store] def validateLimit(limit: Int, maxPageSize: Int): Either[String, Int] = {
23+
if (limit > maxPageSize) Left(s"Exceeded limit maximum ($limit > $maxPageSize).")
2324
else if (limit < 1) Left("Limit must be at least 1.")
2425
else Right(limit)
2526
}
@@ -29,13 +30,13 @@ object Limit {
2930
/** Limit for when the result is expected to not exceed the limit.
3031
* If exceeded, the store should log a WARN.
3132
*/
32-
case class HardLimit private (limit: Int) extends Limit
33+
case class HardLimit private (limit: Int, maxPageSize: Int) extends Limit
3334
object HardLimit {
34-
def apply(limit: Int): Either[String, HardLimit] = {
35-
Limit.validateLimit(limit).map(new HardLimit(_))
35+
def apply(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): Either[String, HardLimit] = {
36+
Limit.validateLimit(limit, maxPageSize).map(new HardLimit(_, maxPageSize))
3637
}
37-
def tryCreate(limit: Int): HardLimit =
38-
HardLimit(limit).valueOr(err =>
38+
def tryCreate(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): HardLimit =
39+
HardLimit(limit, maxPageSize).valueOr(err =>
3940
throw Status.INVALID_ARGUMENT
4041
.withDescription(err)
4142
.asRuntimeException()
@@ -47,14 +48,14 @@ object HardLimit {
4748
* To be used with pagination. A limit of ''n'' implies that if there are
4849
* ''k > n'' result elements, ''z'' elements where ''0≤z≤n'' may be returned.
4950
*/
50-
case class PageLimit private (limit: Int) extends Limit
51+
case class PageLimit private (limit: Int, maxPageSize: Int) extends Limit
5152
object PageLimit {
52-
val Max: PageLimit = new PageLimit(Limit.MaxPageSize)
53-
def apply(limit: Int): Either[String, PageLimit] = {
54-
Limit.validateLimit(limit).map(new PageLimit(_))
53+
val Max: PageLimit = new PageLimit(Limit.DefaultMaxPageSize, Limit.DefaultMaxPageSize)
54+
def apply(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): Either[String, PageLimit] = {
55+
Limit.validateLimit(limit, maxPageSize).map(new PageLimit(_, maxPageSize))
5556
}
56-
def tryCreate(limit: Int): PageLimit =
57-
PageLimit(limit).valueOr(err =>
57+
def tryCreate(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): PageLimit =
58+
PageLimit(limit, maxPageSize).valueOr(err =>
5859
throw Status.INVALID_ARGUMENT
5960
.withDescription(err)
6061
.asRuntimeException()
@@ -71,9 +72,9 @@ trait LimitHelpers { _: NamedLogging =>
7172
traceContext: TraceContext
7273
): C = {
7374
limit match {
74-
case PageLimit(limit) =>
75+
case PageLimit(limit, _) =>
7576
result.take(limit.intValue())
76-
case HardLimit(limit) =>
77+
case HardLimit(limit, _) =>
7778
val resultSize = result.size
7879
if (resultSize > limit) {
7980
logger.warn(
@@ -95,9 +96,9 @@ trait LimitHelpers { _: NamedLogging =>
9596
result: C & scala.collection.IterableOps[?, CC, C],
9697
): C = {
9798
limit match {
98-
case PageLimit(limit) =>
99+
case PageLimit(limit, _) =>
99100
result.take(limit.intValue())
100-
case HardLimit(limit) =>
101+
case HardLimit(limit, _) =>
101102
val resultSize = result.size
102103
if (resultSize > limit) {
103104
throw io.grpc.Status.FAILED_PRECONDITION
@@ -113,8 +114,8 @@ trait LimitHelpers { _: NamedLogging =>
113114

114115
protected def sqlLimit(limit: Limit): Int = {
115116
limit match {
116-
case HardLimit(limit) => limit + 1
117-
case PageLimit(limit) => limit
117+
case PageLimit(limit, _) => limit
118+
case HardLimit(limit, _) => limit + 1
118119
}
119120
}
120121

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/StoreTest.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
139139
LfContractId.assertFromString("00" + f"$cIdCounter%064x").coid
140140
}
141141

142+
protected def resetCIdCounter() = {
143+
cIdCounter = 0
144+
}
145+
142146
protected def time(n: Long): CantonTimestamp = CantonTimestamp.ofEpochSecond(n)
143147

144148
private def schedule(

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ class AcsSnapshotStore(
362362
recordTime,
363363
None,
364364
// if the limit is exceeded by the results from the DB, an exception will be thrown
365-
HardLimit.tryCreate(Limit.MaxPageSize),
365+
HardLimit.tryCreate(Limit.DefaultMaxPageSize),
366366
partyIds,
367367
)
368368
.map { result =>
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.scan.store.bulk
5+
6+
import scala.concurrent.ExecutionContext
7+
import com.digitalasset.canton.data.CantonTimestamp
8+
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
9+
import com.digitalasset.canton.tracing.TraceContext
10+
import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil}
11+
import com.digitalasset.canton.util.PekkoUtil.RetrySourcePolicy
12+
import org.apache.pekko.actor.ActorSystem
13+
import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy}
14+
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source}
15+
import org.apache.pekko.util.ByteString
16+
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
17+
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
18+
import org.lfdecentralizedtrust.splice.store.HardLimit
19+
20+
import scala.concurrent.Future
21+
import io.circe.syntax.*
22+
23+
import java.nio.ByteBuffer
24+
import java.nio.charset.StandardCharsets
25+
import java.util.concurrent.atomic.AtomicInteger
26+
import scala.concurrent.duration.FiniteDuration
27+
28+
import Position.*
29+
30+
case class BulkStorageConfig(
31+
dbReadChunkSize: Int,
32+
maxFileSize: Long,
33+
)
34+
35+
object BulkStorageConfigs {
36+
val bulkStorageConfigV1 = BulkStorageConfig(
37+
1000,
38+
64L * 1024 * 1024,
39+
)
40+
}
41+
42+
object Position {
43+
sealed trait Position
44+
45+
case object Start extends Position
46+
47+
case object End extends Position
48+
49+
final case class Index(value: Long) extends Position
50+
}
51+
52+
class AcsSnapshotBulkStorage(
53+
val config: BulkStorageConfig,
54+
val acsSnapshotStore: AcsSnapshotStore,
55+
val s3Connection: S3BucketConnection,
56+
override val loggerFactory: NamedLoggerFactory,
57+
)(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext)
58+
extends NamedLogging {
59+
60+
private def getAcsSnapshotChunk(
61+
migrationId: Long,
62+
timestamp: CantonTimestamp,
63+
after: Option[Long],
64+
): Future[(Position, ByteString)] = {
65+
for {
66+
snapshot <- acsSnapshotStore.queryAcsSnapshot(
67+
migrationId,
68+
snapshot = timestamp,
69+
after,
70+
HardLimit.tryCreate(config.dbReadChunkSize),
71+
Seq.empty,
72+
Seq.empty,
73+
)
74+
} yield {
75+
val encoded = snapshot.createdEventsInPage.map(event =>
76+
CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event)
77+
)
78+
val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n"
79+
val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8))
80+
logger.debug(
81+
s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes"
82+
)
83+
(snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes)
84+
}
85+
86+
}
87+
88+
def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {
89+
90+
// TODO(#3429): currently, if this crashes half-way through, there is no indication in the S3 objects that
91+
// the snapshot is incomplete. We probably want to label the last object with `last` or something like that
92+
// so that we can detect incomplete snapshots and recreate them.
93+
94+
def mksrc = {
95+
val idx = new AtomicInteger(0)
96+
val base = Source
97+
.unfoldAsync(Start: Position) {
98+
case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_))
99+
case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_))
100+
case End => Future.successful(None)
101+
}
102+
.via(ZstdGroupedWeight(config.maxFileSize))
103+
// Add a buffer so that the next object continues accumulating while we write the previous one
104+
.buffer(
105+
1,
106+
OverflowStrategy.backpressure,
107+
)
108+
.mapAsync(1) { zstdObj =>
109+
{
110+
val objectKey = s"snapshot_$idx.zstd"
111+
// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
112+
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
113+
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
114+
// partially written object.
115+
// TODO(#3429): Error handling
116+
for {
117+
_ <- s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe()))
118+
} yield {
119+
idx.addAndGet(1)
120+
}
121+
}
122+
}
123+
val withKs = base.viaMat(KillSwitches.single)(Keep.right)
124+
withKs.watchTermination() { case (ks, done) => (ks: KillSwitch, done) }
125+
}
126+
127+
// TODO(#3429): tweak the retry parameters here
128+
val delay = FiniteDuration(5, "seconds")
129+
val policy = new RetrySourcePolicy[Unit, Int] {
130+
// TODO(#3429): add a unit test for this retry logic
131+
override def shouldRetry(
132+
lastState: Unit,
133+
lastEmittedElement: Option[Int],
134+
lastFailure: Option[Throwable],
135+
): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = {
136+
lastFailure.map { t =>
137+
logger.warn(s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
138+
.messageWithStacktrace(t)}, will retry after delay of ${delay}")
139+
// Always retry (TODO(#3429): consider a max number of retries?)
140+
delay -> ()
141+
}
142+
}
143+
}
144+
145+
PekkoUtil
146+
.restartSource(
147+
name = "acs-snapshot-dump",
148+
initial = (),
149+
mkSource = (_: Unit) => mksrc,
150+
policy = policy,
151+
)
152+
.runWith(Sink.ignore)
153+
154+
}.map(_ => ())
155+
}

0 commit comments

Comments
 (0)