Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,11 @@ class MultiHostValidatorOperatorIntegrationTest
splitwellWalletClient.balance().unlockedQty should beWithin(47, 48)
// Alice's wallet is stopped, so we confirm the transaction via scan
sv1ScanBackend
.listTransactions(None, TransactionHistoryRequest.SortOrder.Desc, Limit.MaxPageSize)
.listTransactions(
None,
TransactionHistoryRequest.SortOrder.Desc,
Limit.DefaultMaxPageSize,
)
.flatMap(_.transfer)
.filter(tf => tf.description == transferDescription) should not be empty
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import scala.util.{Success, Try}
// this test sets fees to zero, and that only works from 0.1.14 onwards
@org.lfdecentralizedtrust.splice.util.scalatesttags.SpliceAmulet_0_1_14
class ScanIntegrationTest extends IntegrationTest with WalletTestUtil with TimeTestUtil {
private val defaultPageSize = Limit.MaxPageSize
private val defaultPageSize = Limit.DefaultMaxPageSize
override def environmentDefinition: SpliceEnvironmentDefinition =
EnvironmentDefinition
.simpleTopology1Sv(this.getClass.getSimpleName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class SvTimeBasedRewardCouponIntegrationTest
.listTransactions(
None,
TransactionHistoryRequest.SortOrder.Desc,
Limit.MaxPageSize,
Limit.DefaultMaxPageSize,
)
.flatMap(_.transfer)
.filter(tf =>
Expand Down Expand Up @@ -256,7 +256,7 @@ class SvTimeBasedRewardCouponIntegrationTest
sv1WalletClient
.listTransactions(
None,
Limit.MaxPageSize,
Limit.DefaultMaxPageSize,
)
)
.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ class WalletTxLogIntegrationTest
onboardWalletUser(aliceWalletClient, aliceValidatorBackend)
val bobUserParty = onboardWalletUser(bobWalletClient, bobValidatorBackend)
val validatorTxLogBefore =
aliceValidatorWalletClient.listTransactions(None, Limit.MaxPageSize)
aliceValidatorWalletClient.listTransactions(None, Limit.DefaultMaxPageSize)

val (offerCid, _) =
actAndCheck(
Expand Down Expand Up @@ -1284,7 +1284,8 @@ class WalletTxLogIntegrationTest
)

// Only Alice should see notification (note that aliceValidator is shared between tests)
val validatorTxLogAfter = aliceValidatorWalletClient.listTransactions(None, Limit.MaxPageSize)
val validatorTxLogAfter =
aliceValidatorWalletClient.listTransactions(None, Limit.DefaultMaxPageSize)

withoutDevNetTopups(validatorTxLogBefore) should be(
withoutDevNetTopups(validatorTxLogAfter)
Expand Down Expand Up @@ -1413,7 +1414,7 @@ class WalletTxLogIntegrationTest
val aliceUserId = aliceWalletClient.config.ledgerApiUser
val charlieUserId = charlieWalletClient.config.ledgerApiUser
val validatorTxLogBefore =
aliceValidatorWalletClient.listTransactions(None, Limit.MaxPageSize)
aliceValidatorWalletClient.listTransactions(None, Limit.DefaultMaxPageSize)

// Note: using Alice and Charlie because manually creating subscriptions requires both
// the sender and the receiver to be hosted on the same participant.
Expand Down Expand Up @@ -1534,7 +1535,7 @@ class WalletTxLogIntegrationTest

// Validator should not see any notification (note that aliceValidator is shared between tests)
val validatorTxLogAfter =
aliceValidatorWalletClient.listTransactions(None, Limit.MaxPageSize)
aliceValidatorWalletClient.listTransactions(None, Limit.DefaultMaxPageSize)
withoutDevNetTopups(validatorTxLogBefore) should be(
withoutDevNetTopups(validatorTxLogAfter)
)
Expand Down Expand Up @@ -1569,7 +1570,7 @@ class WalletTxLogIntegrationTest
// Note: SV1 is reused between tests, ignore TxLog entries created by previous tests
val previousEventId = withoutDevNetTopups(
sv1WalletClient
.listTransactions(None, Limit.MaxPageSize)
.listTransactions(None, Limit.DefaultMaxPageSize)
).headOption
.map(_.eventId)

Expand Down Expand Up @@ -1655,7 +1656,7 @@ class WalletTxLogIntegrationTest
)
)

aliceWalletClient.listTransactions(None, Limit.MaxPageSize) shouldBe empty
aliceWalletClient.listTransactions(None, Limit.DefaultMaxPageSize) shouldBe empty
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait WalletTxLogTestUtil extends TestCommon with WalletTestUtil with TimeTestUt
): Unit = {

eventually() {
val actual = wallet.listTransactions(None, pageSize = Limit.MaxPageSize)
val actual = wallet.listTransactions(None, pageSize = Limit.DefaultMaxPageSize)
val withoutIgnored = actual
.takeWhile(e => !previousEventId.contains(e.eventId))
.filterNot(ignore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package org.lfdecentralizedtrust.splice.http
import org.lfdecentralizedtrust.splice.codegen.java.splice.validatorlicense as vl
import org.lfdecentralizedtrust.splice.http.v0.definitions
import org.lfdecentralizedtrust.splice.http.v0.definitions.ListValidatorLicensesResponse
import org.lfdecentralizedtrust.splice.store.{AppStore, PageLimit, SortOrder}
import org.lfdecentralizedtrust.splice.store.{AppStore, Limit, PageLimit, SortOrder}
import com.digitalasset.canton.logging.NamedLogging
import com.digitalasset.canton.tracing.{Spanning, TraceContext}
import io.opentelemetry.api.trace.Tracer
Expand All @@ -29,7 +29,9 @@ trait HttpValidatorLicensesHandler extends Spanning with NamedLogging {
.listContractsPaginated(
vl.ValidatorLicense.COMPANION,
after,
limit.fold(PageLimit.Max)(PageLimit.tryCreate),
limit.fold(PageLimit.Max)(limit =>
PageLimit.tryCreate(limit, Limit.DefaultMaxPageSize)
),
SortOrder.Descending,
)
.map(_.mapResultsInPage(_.contract))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import io.grpc.Status

sealed trait Limit {
val limit: Int
// TODO (#767): make configurable (it's currently an argument, but not user-configurable e.g. via config values)
val maxPageSize: Int
}

object Limit {

// TODO (#767): make configurable
val MaxPageSize: Int = 1000
val DefaultLimit: Limit = HardLimit.tryCreate(MaxPageSize)
val DefaultMaxPageSize: Int = 1000
val DefaultLimit: Limit = HardLimit.tryCreate(DefaultMaxPageSize)

private[store] def validateLimit(limit: Int): Either[String, Int] = {
if (limit > MaxPageSize) Left(s"Exceeded limit maximum ($limit > $MaxPageSize).")
private[store] def validateLimit(limit: Int, maxPageSize: Int): Either[String, Int] = {
if (limit > maxPageSize) Left(s"Exceeded limit maximum ($limit > $maxPageSize).")
else if (limit < 1) Left("Limit must be at least 1.")
else Right(limit)
}
Expand All @@ -29,13 +30,13 @@ object Limit {
/** Limit for when the result is expected to not exceed the limit.
* If exceeded, the store should log a WARN.
*/
case class HardLimit private (limit: Int) extends Limit
case class HardLimit private (limit: Int, maxPageSize: Int) extends Limit
object HardLimit {
def apply(limit: Int): Either[String, HardLimit] = {
Limit.validateLimit(limit).map(new HardLimit(_))
def apply(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): Either[String, HardLimit] = {
Limit.validateLimit(limit, maxPageSize).map(new HardLimit(_, maxPageSize))
}
def tryCreate(limit: Int): HardLimit =
HardLimit(limit).valueOr(err =>
def tryCreate(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): HardLimit =
HardLimit(limit, maxPageSize).valueOr(err =>
throw Status.INVALID_ARGUMENT
.withDescription(err)
.asRuntimeException()
Expand All @@ -47,14 +48,14 @@ object HardLimit {
* To be used with pagination. A limit of ''n'' implies that if there are
* ''k > n'' result elements, ''z'' elements where ''0≤z≤n'' may be returned.
*/
case class PageLimit private (limit: Int) extends Limit
case class PageLimit private (limit: Int, maxPageSize: Int) extends Limit
object PageLimit {
val Max: PageLimit = new PageLimit(Limit.MaxPageSize)
def apply(limit: Int): Either[String, PageLimit] = {
Limit.validateLimit(limit).map(new PageLimit(_))
val Max: PageLimit = new PageLimit(Limit.DefaultMaxPageSize, Limit.DefaultMaxPageSize)
def apply(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): Either[String, PageLimit] = {
Limit.validateLimit(limit, maxPageSize).map(new PageLimit(_, maxPageSize))
}
def tryCreate(limit: Int): PageLimit =
PageLimit(limit).valueOr(err =>
def tryCreate(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): PageLimit =
PageLimit(limit, maxPageSize).valueOr(err =>
throw Status.INVALID_ARGUMENT
.withDescription(err)
.asRuntimeException()
Expand All @@ -71,9 +72,9 @@ trait LimitHelpers { _: NamedLogging =>
traceContext: TraceContext
): C = {
limit match {
case PageLimit(limit) =>
case PageLimit(limit, _) =>
result.take(limit.intValue())
case HardLimit(limit) =>
case HardLimit(limit, _) =>
val resultSize = result.size
if (resultSize > limit) {
logger.warn(
Expand All @@ -95,9 +96,9 @@ trait LimitHelpers { _: NamedLogging =>
result: C & scala.collection.IterableOps[?, CC, C],
): C = {
limit match {
case PageLimit(limit) =>
case PageLimit(limit, _) =>
result.take(limit.intValue())
case HardLimit(limit) =>
case HardLimit(limit, _) =>
val resultSize = result.size
if (resultSize > limit) {
throw io.grpc.Status.FAILED_PRECONDITION
Expand All @@ -113,8 +114,8 @@ trait LimitHelpers { _: NamedLogging =>

protected def sqlLimit(limit: Limit): Int = {
limit match {
case HardLimit(limit) => limit + 1
case PageLimit(limit) => limit
case PageLimit(limit, _) => limit
case HardLimit(limit, _) => limit + 1
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
LfContractId.assertFromString("00" + f"$cIdCounter%064x").coid
}

protected def resetCIdCounter() = {
cIdCounter = 0
}

protected def time(n: Long): CantonTimestamp = CantonTimestamp.ofEpochSecond(n)

private def schedule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class AcsSnapshotStore(
recordTime,
None,
// if the limit is exceeded by the results from the DB, an exception will be thrown
HardLimit.tryCreate(Limit.MaxPageSize),
HardLimit.tryCreate(Limit.DefaultMaxPageSize),
partyIds,
)
.map { result =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.scan.store.bulk

import scala.concurrent.ExecutionContext
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil}
import com.digitalasset.canton.util.PekkoUtil.RetrySourcePolicy
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy}
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source}
import org.apache.pekko.util.ByteString
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
import org.lfdecentralizedtrust.splice.store.HardLimit

import scala.concurrent.Future
import io.circe.syntax.*

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.FiniteDuration

import Position.*

case class BulkStorageConfig(
dbReadChunkSize: Int,
maxFileSize: Long,
)

object BulkStorageConfigs {
val bulkStorageConfigV1 = BulkStorageConfig(
1000,
64L * 1024 * 1024,
)
}

object Position {
sealed trait Position

case object Start extends Position

case object End extends Position

final case class Index(value: Long) extends Position
}

class AcsSnapshotBulkStorage(
val config: BulkStorageConfig,
val acsSnapshotStore: AcsSnapshotStore,
val s3Connection: S3BucketConnection,
override val loggerFactory: NamedLoggerFactory,
)(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext)
extends NamedLogging {

private def getAcsSnapshotChunk(
migrationId: Long,
timestamp: CantonTimestamp,
after: Option[Long],
): Future[(Position, ByteString)] = {
for {
snapshot <- acsSnapshotStore.queryAcsSnapshot(
migrationId,
snapshot = timestamp,
after,
HardLimit.tryCreate(config.dbReadChunkSize),
Seq.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, since you do limit = , maybe nice to do partyIds = Seq.empty, templates = Seq.empty for clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (removed the limit =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to default partyIds: Seq[PartyId] = Seq.empty, templates: Seq[PackageQualifiedName] = Seq.empty, on def queryAcsSnapshot( which is nice and clean, but not necessary in this PR

Seq.empty,
)
} yield {
val encoded = snapshot.createdEventsInPage.map(event =>
CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event)
)
val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n"
val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8))
logger.debug(
s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes"
)
(snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes)
}

}

def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {

// TODO(#3429): currently, if this crashes half-way through, there is no indication in the S3 objects that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why not do that right now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or is this just a next PR that you'll work on, fine by me of course)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just followup work in coming PRs, this one's already quite large.

// the snapshot is incomplete. We probably want to label the last object with `last` or something like that
// so that we can detect incomplete snapshots and recreate them.

def mksrc = {
val idx = new AtomicInteger(0)
val base = Source
.unfoldAsync(Start: Position) {
case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_))
case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_))
case End => Future.successful(None)
}
.via(ZstdGroupedWeight(config.maxFileSize))
// Add a buffer so that the next object continues accumulating while we write the previous one
.buffer(
1,
OverflowStrategy.backpressure,
)
.mapAsync(1) { zstdObj =>
{
val objectKey = s"snapshot_$idx.zstd"
// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
// partially written object.
// TODO(#3429): Error handling
for {
_ <- s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe()))
} yield {
idx.addAndGet(1)
}
}
}
val withKs = base.viaMat(KillSwitches.single)(Keep.right)
withKs.watchTermination() { case (ks, done) => (ks: KillSwitch, done) }
}

// TODO(#3429): tweak the retry parameters here
val delay = FiniteDuration(5, "seconds")
val policy = new RetrySourcePolicy[Unit, Int] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test that retry works, that writeFullObject can overwrite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it manually for now, added a TODO to add a unit test for it.

// TODO(#3429): add a unit test for this retry logic
override def shouldRetry(
lastState: Unit,
lastEmittedElement: Option[Int],
lastFailure: Option[Throwable],
): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = {
lastFailure.map { t =>
logger.warn(s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
.messageWithStacktrace(t)}, will retry after delay of ${delay}")
// Always retry (TODO(#3429): consider a max number of retries?)
delay -> ()
}
}
}

PekkoUtil
.restartSource(
name = "acs-snapshot-dump",
initial = (),
mkSource = (_: Unit) => mksrc,
policy = policy,
)
.runWith(Sink.ignore)

}.map(_ => ())
}
Loading