Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ object PageLimit {
)
}

/** Limit with no constraints. Must not be used for production, use only for testing.
*/
case class UnboundLimit private (limit: Int) extends Limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rather do // TODO (#767): make configurable on Limit.MaxPage so you don't need this?

Copy link
Contributor Author

@isegall-da isegall-da Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. I half-did #767 by making it an argument, but not fully configurable (e.g. by an app config, so this tackles only the use case of overriding the max per usage in code).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks

object UnboundLimit {
def apply(limit: Int): UnboundLimit = new UnboundLimit(limit)
}

trait LimitHelpers { _: NamedLogging =>

protected final def applyLimit[CC[_], C](
Expand All @@ -71,8 +78,6 @@ trait LimitHelpers { _: NamedLogging =>
traceContext: TraceContext
): C = {
limit match {
case PageLimit(limit) =>
result.take(limit.intValue())
case HardLimit(limit) =>
val resultSize = result.size
if (resultSize > limit) {
Expand All @@ -86,6 +91,8 @@ trait LimitHelpers { _: NamedLogging =>
} else {
result
}
case _ =>
result.take(limit.limit.intValue())
}
}

Expand All @@ -95,8 +102,6 @@ trait LimitHelpers { _: NamedLogging =>
result: C & scala.collection.IterableOps[?, CC, C],
): C = {
limit match {
case PageLimit(limit) =>
result.take(limit.intValue())
case HardLimit(limit) =>
val resultSize = result.size
if (resultSize > limit) {
Expand All @@ -108,13 +113,15 @@ trait LimitHelpers { _: NamedLogging =>
} else {
result
}
case _ =>
result.take(limit.limit.intValue())
}
}

protected def sqlLimit(limit: Limit): Int = {
limit match {
case HardLimit(limit) => limit + 1
case PageLimit(limit) => limit
case _ => limit.limit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
LfContractId.assertFromString("00" + f"$cIdCounter%064x").coid
}

protected def resetCIdCounter() = {
cIdCounter = 0
}

protected def time(n: Long): CantonTimestamp = CantonTimestamp.ofEpochSecond(n)

private def schedule(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.scan.store.bulk

import scala.concurrent.ExecutionContext
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.tracing.TraceContext
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.OverflowStrategy
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
import org.lfdecentralizedtrust.splice.store.HardLimit

import scala.concurrent.Future
import io.circe.syntax.*
import java.nio.ByteBuffer

case class BulkStorageConfig(
dbReadChunkSize: Int,
maxFileSize: Long,
)

object BulkStorageConfigs {
val bulkStorageConfigV1 = BulkStorageConfig(
1000,
(64 * 1024 * 1024).toLong,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick but you can also do 64L * 1024 * 1024

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)
val bulkStorageTestConfig = BulkStorageConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be defined in a test instead of here?

1000,
50000L,
)
}

sealed trait Position
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Maybe nice to put these in an object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

case object Start extends Position
case object End extends Position
final case class Index(value: Long) extends Position

class AcsSnapshotBulkStorage(
val config: BulkStorageConfig,
val acsSnapshotStore: AcsSnapshotStore,
val s3Connection: S3BucketConnection,
override val loggerFactory: NamedLoggerFactory,
)(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext)
extends NamedLogging {

def getAcsSnapshotChunk(
migrationId: Long,
timestamp: CantonTimestamp,
after: Option[Long],
): Future[(Position, ByteString)] = {
for {
snapshot <- acsSnapshotStore.queryAcsSnapshot(
migrationId,
snapshot = timestamp,
after,
limit = HardLimit.tryCreate(config.dbReadChunkSize),
Seq.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, since you do limit = , maybe nice to do partyIds = Seq.empty, templates = Seq.empty for clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (removed the limit =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to default partyIds: Seq[PartyId] = Seq.empty, templates: Seq[PackageQualifiedName] = Seq.empty, on def queryAcsSnapshot( which is nice and clean, but not necessary in this PR

Seq.empty,
)
} yield {
val encoded = snapshot.createdEventsInPage.map(event =>
CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event)
)
val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n"
val contractsBytes = ByteString(contractsStr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contractsStr.getBytes(StandardCharSets.UTF_8)

logger.debug(
s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes"
)
(snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes)
}

}

def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {

@SuppressWarnings(Array("org.wartremover.warts.Var"))
var idx = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be a volatile member or needs to be atomic integer or reference, you are mutating it concurrently from a future (from a thread on the executioncontext that runs the stream)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. I originally had it an atomicInt, but I'm mutating it from a future with parallelism 1, so convinced myself that it can be a var. But you're probably right than it's better to just be prudent and revert to that.

Just for my education, is my reasoning that parallelism 1 is good enough solid, or am I still missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 1 does not matter you are still accessing the var from that one other thread which isn't allowed in principle. I shoot myself in the foot every time I want to be too clever or loose around these rules, as well as the fact that code changes and the next person doesn't see the special case, or starts copying a bad way of doing things.


Source
.unfoldAsync(Start: Position) {
case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_))
case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_))
case End => Future.successful(None)
}
.via(ZstdGroupedWeight(config.maxFileSize))
// Add a buffer so that the next object continues accumulating while we write the previous one
.buffer(
1,
OverflowStrategy.backpressure,
)
.mapAsync(1) { zstdObj =>
val objectKey = s"snapshot_$idx.zstd"
Future {
// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
// partially written object.
// TODO(#3429): Error handling
val _ = s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe()))
idx += 1
}
}
.runWith(Sink.ignore)

}.map(_ => ())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.scan.store.bulk

import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.tracing.TraceContext
import org.apache.pekko.stream.{Attributes, Outlet, SourceShape}
import org.apache.pekko.stream.stage.{AsyncCallback, GraphStage, GraphStageLogic, OutHandler}
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.QueryAcsSnapshotResult
import org.lfdecentralizedtrust.splice.store.PageLimit
import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

case class AcsSnapshotSource(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to write your own graphstage for this, you can do this with unfoldAsync (or statefulMapConcat but unfoldAsync is easier probably)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, sorry, this is an unused leftover. I indeed already reimplemented it with unfoldAsync.

acsSnapshotStore: AcsSnapshotStore,
timestamp: CantonTimestamp,
migrationId: Long,
override val loggerFactory: NamedLoggerFactory,
)(implicit tc: TraceContext, ec: ExecutionContext)
extends GraphStage[SourceShape[Vector[SpliceCreatedEvent]]]
with NamedLogging {
val out: Outlet[Vector[SpliceCreatedEvent]] = Outlet("AcsSnapshotSource")
override def shape: SourceShape[Vector[SpliceCreatedEvent]] = SourceShape(out)

val numUpdatesPerQuery = 1000

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with OutHandler {
val token = new AtomicReference[Option[Long]](None)

val asyncCallback: AsyncCallback[QueryAcsSnapshotResult] = getAsyncCallback {
case result: QueryAcsSnapshotResult =>
if (result.createdEventsInPage.isEmpty) {
complete(out)
token.set(None)
} else {
push(out, result.createdEventsInPage)
token.set(result.afterToken)
}
case _ =>
logger.error("asyncCallback unexpectedly called with an error")
}

val failureCallback: AsyncCallback[Throwable] = getAsyncCallback { ex =>
fail(out, ex)
}

override def onPull(): Unit = {
acsSnapshotStore
.queryAcsSnapshot(
migrationId,
timestamp,
token.get(),
PageLimit.tryCreate(numUpdatesPerQuery),
Seq.empty,
Seq.empty,
)
.onComplete {
case Success(value) => asyncCallback.invoke(value)
case Failure(exception) => failureCallback.invoke(exception)
}
}
setHandler(out, this)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.scan.store.bulk

import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.tracing.TraceContext
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.core.sync.RequestBody
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.model.{GetObjectRequest, PutObjectRequest}
import software.amazon.awssdk.services.s3.{S3Client, S3Configuration}

import java.net.URI
import java.nio.ByteBuffer

case class S3Config(
endpoint: URI,
bucketName: String,
region: Region,
credentials: AwsBasicCredentials,
)

class S3BucketConnection(
val s3Client: S3Client,
val bucketName: String,
val loggerFactory: NamedLoggerFactory,
) extends NamedLogging {
// Reads the full content of an s3 object into a ByteBuffer.
// Use only for testing, when the object size is known to be small
def readFullObject(key: String): ByteBuffer = {
val obj = s3Client.getObject(GetObjectRequest.builder().bucket(bucketName).key(key).build())
val bytes = obj.readAllBytes()
val ret = ByteBuffer.allocateDirect(bytes.length)
Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary to be a direct buffer right? (I mean as part of readFullObject)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (as part of moving to the asyncClient)

ret.put(bytes)
}

// Writes a full object from memory into an s3 object
def writeFullObject(key: String, content: ByteBuffer)(implicit tc: TraceContext) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add the return type here since it's a public def, is this a blocking call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I omitted the return type is that I didn't find anything useful in it, so I don't actually care about returning it. Would it be cleaner to add a () in the end to make that explicit?

Indeed, it is blocking, hence the Future { } wrapper in the call site. Added to the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's easier to read with ()

logger.debug(s"Writing ${content.array().length} bytes to S3 object $key")
val putObj: PutObjectRequest = PutObjectRequest
.builder()
.bucket(bucketName)
.key(key)
.build()
s3Client.putObject(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you looked at S3AsyncClient ? you can convert CompletableFuture in java to Future in Scala with FutureConverters, and I read that this is non blocking based on Netty, worth a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, adopted that indeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

putObj,
RequestBody.fromBytes(content.array()),
)
}
}

object S3BucketConnection {
def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory) = {
def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory): S3BucketConnection = {

new S3BucketConnection(
S3Client
.builder()
.endpointOverride(s3Config.endpoint)
.region(s3Config.region)
.credentialsProvider(StaticCredentialsProvider.create(s3Config.credentials))
// TODO(#3429): mockS3 and GCS support only path style access. Do we need to make this configurable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if some other S3 compatible storage does not work with this, could investigate that, or wait until someone asks for it. In any case it should be defaulted to this, if we would make it configurable, and then you would need to explicitly turn it off, IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's what I meant (check if it's needed), and that's the plan.

.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
.build(),
bucketName,
loggerFactory,
)
}
}
Loading
Loading