Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package org.lfdecentralizedtrust.splice.http
import org.lfdecentralizedtrust.splice.codegen.java.splice.validatorlicense as vl
import org.lfdecentralizedtrust.splice.http.v0.definitions
import org.lfdecentralizedtrust.splice.http.v0.definitions.ListValidatorLicensesResponse
import org.lfdecentralizedtrust.splice.store.{AppStore, PageLimit, SortOrder}
import org.lfdecentralizedtrust.splice.store.{AppStore, Limit, PageLimit, SortOrder}
import com.digitalasset.canton.logging.NamedLogging
import com.digitalasset.canton.tracing.{Spanning, TraceContext}
import io.opentelemetry.api.trace.Tracer
Expand All @@ -29,7 +29,9 @@ trait HttpValidatorLicensesHandler extends Spanning with NamedLogging {
.listContractsPaginated(
vl.ValidatorLicense.COMPANION,
after,
limit.fold(PageLimit.Max)(PageLimit.tryCreate),
limit.fold(PageLimit.Max)(limit =>
PageLimit.tryCreate(limit, Limit.DefaultMaxPageSize)
),
SortOrder.Descending,
)
.map(_.mapResultsInPage(_.contract))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import io.grpc.Status

sealed trait Limit {
val limit: Int
// TODO (#767): make configurable (it's currently an argument, but not user-configurable e.g. via config values)
val maxPageSize: Int
}

object Limit {

// TODO (#767): make configurable
val MaxPageSize: Int = 1000
val DefaultLimit: Limit = HardLimit.tryCreate(MaxPageSize)
val DefaultMaxPageSize: Int = 1000
val DefaultLimit: Limit = HardLimit.tryCreate(DefaultMaxPageSize)

private[store] def validateLimit(limit: Int): Either[String, Int] = {
if (limit > MaxPageSize) Left(s"Exceeded limit maximum ($limit > $MaxPageSize).")
private[store] def validateLimit(limit: Int, maxPageSize: Int): Either[String, Int] = {
if (limit > maxPageSize) Left(s"Exceeded limit maximum ($limit > $maxPageSize).")
else if (limit < 1) Left("Limit must be at least 1.")
else Right(limit)
}
Expand All @@ -29,13 +30,13 @@ object Limit {
/** Limit for when the result is expected to not exceed the limit.
* If exceeded, the store should log a WARN.
*/
case class HardLimit private (limit: Int) extends Limit
case class HardLimit private (limit: Int, maxPageSize: Int) extends Limit
object HardLimit {
def apply(limit: Int): Either[String, HardLimit] = {
Limit.validateLimit(limit).map(new HardLimit(_))
def apply(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): Either[String, HardLimit] = {
Limit.validateLimit(limit, maxPageSize).map(new HardLimit(_, maxPageSize))
}
def tryCreate(limit: Int): HardLimit =
HardLimit(limit).valueOr(err =>
def tryCreate(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): HardLimit =
HardLimit(limit, maxPageSize).valueOr(err =>
throw Status.INVALID_ARGUMENT
.withDescription(err)
.asRuntimeException()
Expand All @@ -47,14 +48,14 @@ object HardLimit {
* To be used with pagination. A limit of ''n'' implies that if there are
* ''k > n'' result elements, ''z'' elements where ''0≤z≤n'' may be returned.
*/
case class PageLimit private (limit: Int) extends Limit
case class PageLimit private (limit: Int, maxPageSize: Int) extends Limit
object PageLimit {
val Max: PageLimit = new PageLimit(Limit.MaxPageSize)
def apply(limit: Int): Either[String, PageLimit] = {
Limit.validateLimit(limit).map(new PageLimit(_))
val Max: PageLimit = new PageLimit(Limit.DefaultMaxPageSize, Limit.DefaultMaxPageSize)
def apply(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): Either[String, PageLimit] = {
Limit.validateLimit(limit, maxPageSize).map(new PageLimit(_, maxPageSize))
}
def tryCreate(limit: Int): PageLimit =
PageLimit(limit).valueOr(err =>
def tryCreate(limit: Int, maxPageSize: Int = Limit.DefaultMaxPageSize): PageLimit =
PageLimit(limit, maxPageSize).valueOr(err =>
throw Status.INVALID_ARGUMENT
.withDescription(err)
.asRuntimeException()
Expand All @@ -71,9 +72,9 @@ trait LimitHelpers { _: NamedLogging =>
traceContext: TraceContext
): C = {
limit match {
case PageLimit(limit) =>
case PageLimit(limit, _) =>
result.take(limit.intValue())
case HardLimit(limit) =>
case HardLimit(limit, _) =>
val resultSize = result.size
if (resultSize > limit) {
logger.warn(
Expand All @@ -95,9 +96,9 @@ trait LimitHelpers { _: NamedLogging =>
result: C & scala.collection.IterableOps[?, CC, C],
): C = {
limit match {
case PageLimit(limit) =>
case PageLimit(limit, _) =>
result.take(limit.intValue())
case HardLimit(limit) =>
case HardLimit(limit, _) =>
val resultSize = result.size
if (resultSize > limit) {
throw io.grpc.Status.FAILED_PRECONDITION
Expand All @@ -113,8 +114,8 @@ trait LimitHelpers { _: NamedLogging =>

protected def sqlLimit(limit: Limit): Int = {
limit match {
case HardLimit(limit) => limit + 1
case PageLimit(limit) => limit
case PageLimit(limit, _) => limit
case HardLimit(limit, _) => limit + 1
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
LfContractId.assertFromString("00" + f"$cIdCounter%064x").coid
}

protected def resetCIdCounter() = {
cIdCounter = 0
}

protected def time(n: Long): CantonTimestamp = CantonTimestamp.ofEpochSecond(n)

private def schedule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class AcsSnapshotStore(
recordTime,
None,
// if the limit is exceeded by the results from the DB, an exception will be thrown
HardLimit.tryCreate(Limit.MaxPageSize),
HardLimit.tryCreate(Limit.DefaultMaxPageSize),
partyIds,
)
.map { result =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.scan.store.bulk

import scala.concurrent.ExecutionContext
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil}
import com.digitalasset.canton.util.PekkoUtil.RetrySourcePolicy
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy}
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source}
import org.apache.pekko.util.ByteString
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
import org.lfdecentralizedtrust.splice.store.HardLimit

import scala.concurrent.Future
import io.circe.syntax.*

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.FiniteDuration

case class BulkStorageConfig(
dbReadChunkSize: Int,
maxFileSize: Long,
)

object BulkStorageConfigs {
val bulkStorageConfigV1 = BulkStorageConfig(
1000,
(64 * 1024 * 1024).toLong,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick but you can also do 64L * 1024 * 1024

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)
}

sealed trait Position
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Maybe nice to put these in an object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

case object Start extends Position
case object End extends Position
final case class Index(value: Long) extends Position

class AcsSnapshotBulkStorage(
val config: BulkStorageConfig,
val acsSnapshotStore: AcsSnapshotStore,
val s3Connection: S3BucketConnection,
override val loggerFactory: NamedLoggerFactory,
)(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext)
extends NamedLogging {

private def getAcsSnapshotChunk(
migrationId: Long,
timestamp: CantonTimestamp,
after: Option[Long],
): Future[(Position, ByteString)] = {
for {
snapshot <- acsSnapshotStore.queryAcsSnapshot(
migrationId,
snapshot = timestamp,
after,
limit = HardLimit.tryCreate(config.dbReadChunkSize),
Seq.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, since you do limit = , maybe nice to do partyIds = Seq.empty, templates = Seq.empty for clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (removed the limit =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to default partyIds: Seq[PartyId] = Seq.empty, templates: Seq[PackageQualifiedName] = Seq.empty, on def queryAcsSnapshot( which is nice and clean, but not necessary in this PR

Seq.empty,
)
} yield {
val encoded = snapshot.createdEventsInPage.map(event =>
CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event)
)
val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n"
val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8))
logger.debug(
s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes"
)
(snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes)
}

}

def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {

// TODO(#3429): currently, if this crashes half-way through, there is no indication in the S3 objects that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why not do that right now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or is this just a next PR that you'll work on, fine by me of course)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just followup work in coming PRs, this one's already quite large.

// the snapshot is incomplete. We probably want to label the last object with `last` or something like that
// so that we can detect incomplete snapshots and recreate them.

def mksrc = {
val idx = new AtomicInteger(0)
val base = Source
.unfoldAsync(Start: Position) {
case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_))
case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_))
case End => Future.successful(None)
}
.via(ZstdGroupedWeight(config.maxFileSize))
// Add a buffer so that the next object continues accumulating while we write the previous one
.buffer(
1,
OverflowStrategy.backpressure,
)
.mapAsync(1) { zstdObj =>
val objectKey = s"snapshot_$idx.zstd"
Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use a blocking threadpool for this future, since it blocks, now you're likely using the default dispatcher. sadly we can't use connector which does use proper non-blocking I/O. So you need to do
Future{}(ec) and get the ec which is used for blocking I/O in canton or get one from pekko configured as a dispatcher.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course it depends on the ec implicit on this class but it's not visible that way that this class needs a separate threadpool for blocking I/O. maybe better just too create one threadpool fixed for this

Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should wrap it in blocking { } anyway (the writeFullObject). And then if it uses canton's executorservice stuff it might know what to do..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was a separate app/service just give it a fixed threadpool to work with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(but later on I noticed that there is a aws S3AsyncClient you should just use instead and not worry about the blocking, if this thing works well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed with the asyncClient that you pointed to.

// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
// partially written object.
// TODO(#3429): Error handling
val _ =
s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe()))
idx.addAndGet(1)
}
}
val withKs = base.viaMat(KillSwitches.single)(Keep.right)
withKs.watchTermination() { case (ks, done) => (ks: KillSwitch, done) }
}

// TODO(#3429): tweak the retry parameters here
val delay = FiniteDuration(5, "seconds")
val policy = new RetrySourcePolicy[Unit, Int] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test that retry works, that writeFullObject can overwrite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it manually for now, added a TODO to add a unit test for it.

override def shouldRetry(
lastState: Unit,
lastEmittedElement: Option[Int],
lastFailure: Option[Throwable],
): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = {
val prefixMsg =
s"RetrySourcePolicy for restart of AcsSnapshotBulkStorage with ${delay} delay:"
lastFailure match {
case Some(t) =>
logger.info(s"$prefixMsg Last failure: ${ErrorUtil.messageWithStacktrace(t)}")
case None =>
logger.debug(s"$prefixMsg No failure, normal restart.")
}
// Always retry (TODO(#3429): consider a max number of retries?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the reasons why I would think it is nice for this thing to be separate from the scan app pod. Let it crash when it can't write, recover on startup where you were, and let k8s restart the pod, this then also far easier shows up on failures in monitoring, and you don't have to crash the scan app. And the reboot effect sometimes works far better than staying in process. My two cents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but as discussed, breaking scan up into multiple microservices has its own downsides.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fair, also maybe not the best comment I made on this particular PR 😄

Some(delay -> ())
}
}

PekkoUtil
.restartSource(
name = "acs-snapshot-dump",
initial = (),
mkSource = (_: Unit) => mksrc,
policy = policy,
)
.runWith(Sink.ignore)

}.map(_ => ())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.scan.store.bulk

import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.tracing.TraceContext
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.{GetObjectRequest, PutObjectRequest}
import software.amazon.awssdk.services.s3.{S3Client, S3Configuration}

import java.net.URI
import java.nio.ByteBuffer

case class S3Config(
endpoint: URI,
bucketName: String,
region: Region,
credentials: AwsBasicCredentials,
)

class S3BucketConnection(
val s3Client: S3Client,
val bucketName: String,
val loggerFactory: NamedLoggerFactory,
) extends NamedLogging {
// Reads the full content of an s3 object into a ByteBuffer.
// Use only for testing, when the object size is known to be small
def readFullObject(key: String): ByteBuffer = {
val obj = s3Client.getObject(GetObjectRequest.builder().bucket(bucketName).key(key).build())
val bytes = obj.readAllBytes()
val ret = ByteBuffer.allocateDirect(bytes.length)
Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary to be a direct buffer right? (I mean as part of readFullObject)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (as part of moving to the asyncClient)

ret.put(bytes)
}

// Writes a full object from memory into an s3 object (a blocking call)
def writeFullObject(key: String, content: ByteBuffer)(implicit tc: TraceContext): Unit = {
logger.debug(s"Writing ${content.array().length} bytes to S3 object $key")
val putObj: PutObjectRequest = PutObjectRequest
.builder()
.bucket(bucketName)
.key(key)
.build()
s3Client.putObject(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you looked at S3AsyncClient ? you can convert CompletableFuture in java to Future in Scala with FutureConverters, and I read that this is non blocking based on Netty, worth a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, adopted that indeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

putObj,
RequestBody.fromBytes(content.array()),
)
()
}
}

object S3BucketConnection {
def apply(
s3Config: S3Config,
bucketName: String,
loggerFactory: NamedLoggerFactory,
): S3BucketConnection = {
new S3BucketConnection(
S3Client
.builder()
.endpointOverride(s3Config.endpoint)
.region(s3Config.region)
.credentialsProvider(StaticCredentialsProvider.create(s3Config.credentials))
// TODO(#3429): mockS3 and GCS support only path style access. Do we need to make this configurable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if some other S3 compatible storage does not work with this, could investigate that, or wait until someone asks for it. In any case it should be defaulted to this, if we would make it configurable, and then you would need to explicitly turn it off, IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's what I meant (check if it's needed), and that's the plan.

.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
.build(),
bucketName,
loggerFactory,
)
}
}
Loading
Loading