-
Notifications
You must be signed in to change notification settings - Fork 53
[ci] bulk-storage support for acs snapshots #3423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
…-snapshots Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
ray-roestenburg-da
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going in the right direction IMHO, some initial comments. The possible memory leak, handling exceptions, and always making sure the zstd object is closed are my most important comments.
| 1000, | ||
| (64 * 1024 * 1024).toLong, | ||
| ) | ||
| val bulkStorageTestConfig = BulkStorageConfig( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be defined in a test instead of here?
.../src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala
Show resolved
Hide resolved
| import scala.concurrent.ExecutionContext | ||
| import scala.util.{Failure, Success} | ||
|
|
||
| case class AcsSnapshotSource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to write your own graphstage for this, you can do this with unfoldAsync (or statefulMapConcat but unfoldAsync is easier probably)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argh, sorry, this is an unused leftover. I indeed already reimplemented it with unfoldAsync.
| } | ||
|
|
||
| // Writes a full object from memory into an s3 object | ||
| def writeFullObject(key: String, content: ByteBuffer)(implicit tc: TraceContext) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to add the return type here since it's a public def, is this a blocking call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I omitted the return type is that I didn't find anything useful in it, so I don't actually care about returning it. Would it be cleaner to add a () in the end to make that explicit?
Indeed, it is blocking, hence the Future { } wrapper in the call site. Added to the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it's easier to read with ()
| val compressionLevel: Int = 3, | ||
| ) { | ||
|
|
||
| val tmpBuffer = ByteBuffer.allocateDirect(tmpBufferSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is allocated outside of JVM, not GC-ed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are likely currently leaking native memory and not cleaning up the compressingStream correctly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to at least deref the tmpBuffer at some point so that it becomes unreachable and can get deleted. (there is still the problem that you don't know when the JVM will actually free it)
there is cleanup code for this kind of stuff but that's not portable I think. And once you use directbuffers you really need to set -XX:MaxDirectMemorySize to you know when this happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem was that zstd asserts that the buffer is direct, but I'll definitely look into not leaking it at least.
|
|
||
| /** Limit with no constraints. Must not be used for production, use only for testing. | ||
| */ | ||
| case class UnboundLimit private (limit: Int) extends Limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rather do // TODO (#767): make configurable on Limit.MaxPage so you don't need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair. I half-did #767 by making it an argument, but not fully configurable (e.g. by an app config, so this tackles only the use case of overriding the max per usage in code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks
| CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event) | ||
| ) | ||
| val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n" | ||
| val contractsBytes = ByteString(contractsStr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
contractsStr.getBytes(StandardCharSets.UTF_8)
| def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = { | ||
|
|
||
| @SuppressWarnings(Array("org.wartremover.warts.Var")) | ||
| var idx = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be a volatile member or needs to be atomic integer or reference, you are mutating it concurrently from a future (from a thread on the executioncontext that runs the stream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm.. I originally had it an atomicInt, but I'm mutating it from a future with parallelism 1, so convinced myself that it can be a var. But you're probably right than it's better to just be prudent and revert to that.
Just for my education, is my reasoning that parallelism 1 is good enough solid, or am I still missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the 1 does not matter you are still accessing the var from that one other thread which isn't allowed in principle. I shoot myself in the foot every time I want to be too clever or loose around these rules, as well as the fact that code changes and the next person doesn't see the special case, or starts copying a bad way of doing things.
| } | ||
|
|
||
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | ||
| new GraphStageLogic(shape) with InHandler with OutHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this needs s preStart to create the buffer and a postStop to deref it (and close the zstd stream)
| } | ||
|
|
||
| object S3BucketConnection { | ||
| def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory) = { | |
| def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory): S3BucketConnection = { |
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
|
@ray-roestenburg-da I believe I addressed all your comments, mind taking another look please? |
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
| object BulkStorageConfigs { | ||
| val bulkStorageConfigV1 = BulkStorageConfig( | ||
| 1000, | ||
| (64 * 1024 * 1024).toLong, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick but you can also do 64L * 1024 * 1024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| ) | ||
| } | ||
|
|
||
| sealed trait Position |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Maybe nice to put these in an object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| snapshot = timestamp, | ||
| after, | ||
| limit = HardLimit.tryCreate(config.dbReadChunkSize), | ||
| Seq.empty, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, since you do limit = , maybe nice to do partyIds = Seq.empty, templates = Seq.empty for clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed (removed the limit =)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to default partyIds: Seq[PartyId] = Seq.empty, templates: Seq[PackageQualifiedName] = Seq.empty, on def queryAcsSnapshot( which is nice and clean, but not necessary in this PR
| ) | ||
| .mapAsync(1) { zstdObj => | ||
| val objectKey = s"snapshot_$idx.zstd" | ||
| Future { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use a blocking threadpool for this future, since it blocks, now you're likely using the default dispatcher. sadly we can't use connector which does use proper non-blocking I/O. So you need to do
Future{}(ec) and get the ec which is used for blocking I/O in canton or get one from pekko configured as a dispatcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of course it depends on the ec implicit on this class but it's not visible that way that this class needs a separate threadpool for blocking I/O. maybe better just too create one threadpool fixed for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably should wrap it in blocking { } anyway (the writeFullObject). And then if it uses canton's executorservice stuff it might know what to do..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this was a separate app/service just give it a fixed threadpool to work with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(but later on I noticed that there is a aws S3AsyncClient you should just use instead and not worry about the blocking, if this thing works well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed with the asyncClient that you pointed to.
| case None => | ||
| logger.debug(s"$prefixMsg No failure, normal restart.") | ||
| } | ||
| // Always retry (TODO(#3429): consider a max number of retries?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one of the reasons why I would think it is nice for this thing to be separate from the scan app pod. Let it crash when it can't write, recover on startup where you were, and let k8s restart the pod, this then also far easier shows up on failures in monitoring, and you don't have to crash the scan app. And the reboot effect sometimes works far better than staying in process. My two cents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but as discussed, breaking scan up into multiple microservices has its own downsides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok fair, also maybe not the best comment I made on this particular PR 😄
|
|
||
| // TODO(#3429): tweak the retry parameters here | ||
| val delay = FiniteDuration(5, "seconds") | ||
| val policy = new RetrySourcePolicy[Unit, Int] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a test that retry works, that writeFullObject can overwrite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested it manually for now, added a TODO to add a unit test for it.
...scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/S3BucketConnection.scala
Show resolved
Hide resolved
| def readFullObject(key: String): ByteBuffer = { | ||
| val obj = s3Client.getObject(GetObjectRequest.builder().bucket(bucketName).key(key).build()) | ||
| val bytes = obj.readAllBytes() | ||
| val ret = ByteBuffer.allocateDirect(bytes.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not necessary to be a direct buffer right? (I mean as part of readFullObject)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed (as part of moving to the asyncClient)
| .bucket(bucketName) | ||
| .key(key) | ||
| .build() | ||
| s3Client.putObject( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you looked at S3AsyncClient ? you can convert CompletableFuture in java to Future in Scala with FutureConverters, and I read that this is non blocking based on Netty, worth a try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, adopted that indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
| } | ||
|
|
||
| override def close(): Unit = { | ||
| compressingStream.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this class own the tmpBuffer as well (let is create the buffer in constructor body and assign to private var and also add tmpBuffer = null here in close, then everything is nicely following AutoCloseable
ray-roestenburg-da
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some more comments, especially to try out S3AsyncClient
Signed-off-by: Itai Segall <[email protected]>
Thanks, that indeed seems to work nicely. |
Signed-off-by: Itai Segall <[email protected]>
Signed-off-by: Itai Segall <[email protected]>
|
|
||
| def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = { | ||
|
|
||
| // TODO(#3429): currently, if this crashes half-way through, there is no indication in the S3 objects that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, why not do that right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or is this just a next PR that you'll work on, fine by me of course)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, just followup work in coming PRs, this one's already quite large.
| AsyncRequestBody.fromBytes(content.array()), | ||
| ) | ||
| .asScala | ||
| .map(_ => ()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe at a later stage you can get the result and check ETag against md5 hash of what you wrote, or something like that to guarantee it's really there. (not meant now in this PR)
| .endpointOverride(s3Config.endpoint) | ||
| .region(s3Config.region) | ||
| .credentialsProvider(StaticCredentialsProvider.create(s3Config.credentials)) | ||
| // TODO(#3429): mockS3 and GCS support only path style access. Do we need to make this configurable? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if some other S3 compatible storage does not work with this, could investigate that, or wait until someone asks for it. In any case it should be defaulted to this, if we would make it configurable, and then you would need to explicitly turn it off, IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that's what I meant (check if it's needed), and that's the plan.
| case class ZstdGroupedWeight(minSize: Long) extends GraphStage[FlowShape[ByteString, ByteString]] { | ||
| require(minSize > 0, "minSize must be greater than 0") | ||
|
|
||
| val zstdTmpBufferSize = 10 * 1024 * 1024; // TODO(#3429): make configurable? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think makes sense to configure as part of BulkStorageConfig, with sensible default, so it can be tuned if necessary. (though likely this is a fine number)
|
|
||
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | ||
| new GraphStageLogic(shape) with InHandler with OutHandler { | ||
| // TODO(#3429): consider implementing a pool of tmp buffers to avoid allocating a new one for each stage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes good idea, quite standard to use a pool for this (and then manage cleanup / deref there)
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | ||
| new GraphStageLogic(shape) with InHandler with OutHandler { | ||
| // TODO(#3429): consider implementing a pool of tmp buffers to avoid allocating a new one for each stage | ||
| private val tmpBuffer = ByteBuffer.allocateDirect(zstdTmpBufferSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this in preStart is better, otherwise you allocate even if the stream never runs or fails before starting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 added to the todo for the pool, since we'll revisit this code then anyway.
| override def postStop(): Unit = { | ||
| super.postStop() | ||
| if (zstd.get() != null) { | ||
| zstd.get().close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you allocate the buffer in this graphStageLogic you also need to deref it here. (or move everything to Zstd)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be improved in a followup PR that will add a buffer pool
| new ZstdDirectBufferCompressingStreamNoFinalizer(tmpBuffer, compressionLevel) | ||
|
|
||
| def compress(input: ByteString): ByteString = { | ||
| val inputBB = ByteBuffer.allocateDirect(input.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you not reuse this buffer instead of creating a new one for every compression? allocateDirect is slow, GC does not see native mem so it can take a while before it gets cleaned up, in the meantime native memory starts piling up. maybe better, since you always use this from graphstagelogic, is to have a reusable direct buffer there, pass it into this method, and reuse it. (otherwise you risk running out of memory by piling up native memory while gc is not necessary, if you start streaming a lot of files)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
direct buffers are a PITA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://openjdk.org/jeps/442 Project Panama in JDK 22 should make things a lot better, but likely that still would not work with this library, they'll need to update it, or we of course could consider contributing...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added another TODO for a buffer pool here as well.
Signed-off-by: Itai Segall <[email protected]>
|
@ray-roestenburg-da I believe I addressed all your comments (or put TODOs to address in followup PRs). Ready for an approve? |
ray-roestenburg-da
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, thanks!
Part of #3429
Implements dumping an ACS snapshots to S3 storage, with a unit test. Not yet integrated with anything else.
Pull Request Checklist
Cluster Testing
/cluster_teston this PR to request it, and ping someone with access to the DA-internal system to approve it./hdm_teston this PR to request it, and ping someone with access to the DA-internal system to approve it.PR Guidelines
Fixes #n, and mention issues worked on using#nMerge Guidelines