Skip to content

Conversation

@isegall-da
Copy link
Contributor

@isegall-da isegall-da commented Dec 17, 2025

Part of #3429
Implements dumping an ACS snapshots to S3 storage, with a unit test. Not yet integrated with anything else.

Pull Request Checklist

Cluster Testing

  • If a cluster test is required, comment /cluster_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.
  • If a hard-migration test is required (from the latest release), comment /hdm_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.

PR Guidelines

  • Include any change that might be observable by our partners or affect their deployment in the release notes.
  • Specify fixed issues with Fixes #n, and mention issues worked on using #n
  • Include a screenshot for frontend-related PRs - see README or use your favorite screenshot tool

Merge Guidelines

  • Make the git commit message look sensible when squash-merging on GitHub (most likely: just copy your PR description).

Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
@isegall-da isegall-da marked this pull request as ready for review December 18, 2025 15:25
Signed-off-by: Itai Segall <[email protected]>
Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going in the right direction IMHO, some initial comments. The possible memory leak, handling exceptions, and always making sure the zstd object is closed are my most important comments.

1000,
(64 * 1024 * 1024).toLong,
)
val bulkStorageTestConfig = BulkStorageConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be defined in a test instead of here?

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

case class AcsSnapshotSource(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to write your own graphstage for this, you can do this with unfoldAsync (or statefulMapConcat but unfoldAsync is easier probably)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, sorry, this is an unused leftover. I indeed already reimplemented it with unfoldAsync.

}

// Writes a full object from memory into an s3 object
def writeFullObject(key: String, content: ByteBuffer)(implicit tc: TraceContext) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add the return type here since it's a public def, is this a blocking call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I omitted the return type is that I didn't find anything useful in it, so I don't actually care about returning it. Would it be cleaner to add a () in the end to make that explicit?

Indeed, it is blocking, hence the Future { } wrapper in the call site. Added to the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's easier to read with ()

val compressionLevel: Int = 3,
) {

val tmpBuffer = ByteBuffer.allocateDirect(tmpBufferSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is allocated outside of JVM, not GC-ed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are likely currently leaking native memory and not cleaning up the compressingStream correctly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to at least deref the tmpBuffer at some point so that it becomes unreachable and can get deleted. (there is still the problem that you don't know when the JVM will actually free it)
there is cleanup code for this kind of stuff but that's not portable I think. And once you use directbuffers you really need to set -XX:MaxDirectMemorySize to you know when this happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was that zstd asserts that the buffer is direct, but I'll definitely look into not leaking it at least.


/** Limit with no constraints. Must not be used for production, use only for testing.
*/
case class UnboundLimit private (limit: Int) extends Limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rather do // TODO (#767): make configurable on Limit.MaxPage so you don't need this?

Copy link
Contributor Author

@isegall-da isegall-da Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair. I half-did #767 by making it an argument, but not fully configurable (e.g. by an app config, so this tackles only the use case of overriding the max per usage in code).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks

CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event)
)
val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n"
val contractsBytes = ByteString(contractsStr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contractsStr.getBytes(StandardCharSets.UTF_8)

def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {

@SuppressWarnings(Array("org.wartremover.warts.Var"))
var idx = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be a volatile member or needs to be atomic integer or reference, you are mutating it concurrently from a future (from a thread on the executioncontext that runs the stream)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. I originally had it an atomicInt, but I'm mutating it from a future with parallelism 1, so convinced myself that it can be a var. But you're probably right than it's better to just be prudent and revert to that.

Just for my education, is my reasoning that parallelism 1 is good enough solid, or am I still missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 1 does not matter you are still accessing the var from that one other thread which isn't allowed in principle. I shoot myself in the foot every time I want to be too clever or loose around these rules, as well as the fact that code changes and the next person doesn't see the special case, or starts copying a bad way of doing things.

}

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this needs s preStart to create the buffer and a postStop to deref it (and close the zstd stream)

}

object S3BucketConnection {
def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory) = {
def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory): S3BucketConnection = {

Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
@isegall-da
Copy link
Contributor Author

@ray-roestenburg-da I believe I addressed all your comments, mind taking another look please?

Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
object BulkStorageConfigs {
val bulkStorageConfigV1 = BulkStorageConfig(
1000,
(64 * 1024 * 1024).toLong,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick but you can also do 64L * 1024 * 1024

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)
}

sealed trait Position
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Maybe nice to put these in an object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

snapshot = timestamp,
after,
limit = HardLimit.tryCreate(config.dbReadChunkSize),
Seq.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, since you do limit = , maybe nice to do partyIds = Seq.empty, templates = Seq.empty for clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (removed the limit =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to default partyIds: Seq[PartyId] = Seq.empty, templates: Seq[PackageQualifiedName] = Seq.empty, on def queryAcsSnapshot( which is nice and clean, but not necessary in this PR

)
.mapAsync(1) { zstdObj =>
val objectKey = s"snapshot_$idx.zstd"
Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use a blocking threadpool for this future, since it blocks, now you're likely using the default dispatcher. sadly we can't use connector which does use proper non-blocking I/O. So you need to do
Future{}(ec) and get the ec which is used for blocking I/O in canton or get one from pekko configured as a dispatcher.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course it depends on the ec implicit on this class but it's not visible that way that this class needs a separate threadpool for blocking I/O. maybe better just too create one threadpool fixed for this

Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should wrap it in blocking { } anyway (the writeFullObject). And then if it uses canton's executorservice stuff it might know what to do..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was a separate app/service just give it a fixed threadpool to work with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(but later on I noticed that there is a aws S3AsyncClient you should just use instead and not worry about the blocking, if this thing works well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed with the asyncClient that you pointed to.

case None =>
logger.debug(s"$prefixMsg No failure, normal restart.")
}
// Always retry (TODO(#3429): consider a max number of retries?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the reasons why I would think it is nice for this thing to be separate from the scan app pod. Let it crash when it can't write, recover on startup where you were, and let k8s restart the pod, this then also far easier shows up on failures in monitoring, and you don't have to crash the scan app. And the reboot effect sometimes works far better than staying in process. My two cents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but as discussed, breaking scan up into multiple microservices has its own downsides.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fair, also maybe not the best comment I made on this particular PR 😄


// TODO(#3429): tweak the retry parameters here
val delay = FiniteDuration(5, "seconds")
val policy = new RetrySourcePolicy[Unit, Int] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test that retry works, that writeFullObject can overwrite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it manually for now, added a TODO to add a unit test for it.

def readFullObject(key: String): ByteBuffer = {
val obj = s3Client.getObject(GetObjectRequest.builder().bucket(bucketName).key(key).build())
val bytes = obj.readAllBytes()
val ret = ByteBuffer.allocateDirect(bytes.length)
Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary to be a direct buffer right? (I mean as part of readFullObject)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (as part of moving to the asyncClient)

.bucket(bucketName)
.key(key)
.build()
s3Client.putObject(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you looked at S3AsyncClient ? you can convert CompletableFuture in java to Future in Scala with FutureConverters, and I read that this is non blocking based on Netty, worth a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, adopted that indeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}

override def close(): Unit = {
compressingStream.close()
Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this class own the tmpBuffer as well (let is create the buffer in constructor body and assign to private var and also add tmpBuffer = null here in close, then everything is nicely following AutoCloseable

Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some more comments, especially to try out S3AsyncClient

Signed-off-by: Itai Segall <[email protected]>
@isegall-da
Copy link
Contributor Author

Left some more comments, especially to try out S3AsyncClient

Thanks, that indeed seems to work nicely.

Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>

def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {

// TODO(#3429): currently, if this crashes half-way through, there is no indication in the S3 objects that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why not do that right now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or is this just a next PR that you'll work on, fine by me of course)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just followup work in coming PRs, this one's already quite large.

AsyncRequestBody.fromBytes(content.array()),
)
.asScala
.map(_ => ())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe at a later stage you can get the result and check ETag against md5 hash of what you wrote, or something like that to guarantee it's really there. (not meant now in this PR)

.endpointOverride(s3Config.endpoint)
.region(s3Config.region)
.credentialsProvider(StaticCredentialsProvider.create(s3Config.credentials))
// TODO(#3429): mockS3 and GCS support only path style access. Do we need to make this configurable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if some other S3 compatible storage does not work with this, could investigate that, or wait until someone asks for it. In any case it should be defaulted to this, if we would make it configurable, and then you would need to explicitly turn it off, IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's what I meant (check if it's needed), and that's the plan.

case class ZstdGroupedWeight(minSize: Long) extends GraphStage[FlowShape[ByteString, ByteString]] {
require(minSize > 0, "minSize must be greater than 0")

val zstdTmpBufferSize = 10 * 1024 * 1024; // TODO(#3429): make configurable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think makes sense to configure as part of BulkStorageConfig, with sensible default, so it can be tuned if necessary. (though likely this is a fine number)


override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
// TODO(#3429): consider implementing a pool of tmp buffers to avoid allocating a new one for each stage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes good idea, quite standard to use a pool for this (and then manage cleanup / deref there)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
// TODO(#3429): consider implementing a pool of tmp buffers to avoid allocating a new one for each stage
private val tmpBuffer = ByteBuffer.allocateDirect(zstdTmpBufferSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this in preStart is better, otherwise you allocate even if the stream never runs or fails before starting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 added to the todo for the pool, since we'll revisit this code then anyway.

override def postStop(): Unit = {
super.postStop()
if (zstd.get() != null) {
zstd.get().close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you allocate the buffer in this graphStageLogic you also need to deref it here. (or move everything to Zstd)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be improved in a followup PR that will add a buffer pool

new ZstdDirectBufferCompressingStreamNoFinalizer(tmpBuffer, compressionLevel)

def compress(input: ByteString): ByteString = {
val inputBB = ByteBuffer.allocateDirect(input.size)
Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you not reuse this buffer instead of creating a new one for every compression? allocateDirect is slow, GC does not see native mem so it can take a while before it gets cleaned up, in the meantime native memory starts piling up. maybe better, since you always use this from graphstagelogic, is to have a reusable direct buffer there, pass it into this method, and reuse it. (otherwise you risk running out of memory by piling up native memory while gc is not necessary, if you start streaming a lot of files)

Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

direct buffers are a PITA.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://openjdk.org/jeps/442 Project Panama in JDK 22 should make things a lot better, but likely that still would not work with this library, they'll need to update it, or we of course could consider contributing...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added another TODO for a buffer pool here as well.

Signed-off-by: Itai Segall <[email protected]>
@isegall-da
Copy link
Contributor Author

@ray-roestenburg-da I believe I addressed all your comments (or put TODOs to address in followup PRs). Ready for an approve?

Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice, thanks!

@isegall-da isegall-da merged commit 9c58bcc into main Dec 19, 2025
116 of 118 checks passed
@isegall-da isegall-da deleted the isegall/bulk-acs-snapshots branch December 19, 2025 19:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants