-
Notifications
You must be signed in to change notification settings - Fork 53
[ci] bulk-storage support for acs snapshots #3423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
d8dd90f
d2f9b76
7331008
f913382
c1f4c97
342aebe
bbf5af2
3744d31
3294ea7
2cd38ba
2f7ca2b
9a8b0cd
c06bd8f
0bd1419
2c84efe
8235c96
2c126c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| // Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package org.lfdecentralizedtrust.splice.scan.store.bulk | ||
|
|
||
| import scala.concurrent.ExecutionContext | ||
| import com.digitalasset.canton.data.CantonTimestamp | ||
| import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} | ||
| import com.digitalasset.canton.tracing.TraceContext | ||
| import org.apache.pekko.actor.ActorSystem | ||
| import org.apache.pekko.stream.OverflowStrategy | ||
| import org.apache.pekko.stream.scaladsl.{Sink, Source} | ||
| import org.apache.pekko.util.ByteString | ||
| import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings | ||
| import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore | ||
| import org.lfdecentralizedtrust.splice.store.HardLimit | ||
|
|
||
| import scala.concurrent.Future | ||
| import io.circe.syntax.* | ||
| import java.nio.ByteBuffer | ||
|
|
||
| case class BulkStorageConfig( | ||
| dbReadChunkSize: Int, | ||
| maxFileSize: Long, | ||
| ) | ||
|
|
||
| object BulkStorageConfigs { | ||
| val bulkStorageConfigV1 = BulkStorageConfig( | ||
| 1000, | ||
| (64 * 1024 * 1024).toLong, | ||
|
||
| ) | ||
| val bulkStorageTestConfig = BulkStorageConfig( | ||
|
||
| 1000, | ||
| 50000L, | ||
| ) | ||
| } | ||
|
|
||
| sealed trait Position | ||
|
||
| case object Start extends Position | ||
| case object End extends Position | ||
| final case class Index(value: Long) extends Position | ||
|
|
||
| class AcsSnapshotBulkStorage( | ||
| val config: BulkStorageConfig, | ||
| val acsSnapshotStore: AcsSnapshotStore, | ||
| val s3Connection: S3BucketConnection, | ||
| override val loggerFactory: NamedLoggerFactory, | ||
| )(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext) | ||
| extends NamedLogging { | ||
|
|
||
| def getAcsSnapshotChunk( | ||
| migrationId: Long, | ||
| timestamp: CantonTimestamp, | ||
| after: Option[Long], | ||
| ): Future[(Position, ByteString)] = { | ||
isegall-da marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for { | ||
| snapshot <- acsSnapshotStore.queryAcsSnapshot( | ||
| migrationId, | ||
| snapshot = timestamp, | ||
| after, | ||
| limit = HardLimit.tryCreate(config.dbReadChunkSize), | ||
| Seq.empty, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, since you do limit = , maybe nice to do partyIds = Seq.empty, templates = Seq.empty for clarity
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed (removed the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes sense to default |
||
| Seq.empty, | ||
| ) | ||
| } yield { | ||
| val encoded = snapshot.createdEventsInPage.map(event => | ||
| CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event) | ||
| ) | ||
| val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n" | ||
| val contractsBytes = ByteString(contractsStr) | ||
|
||
| logger.debug( | ||
| s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes" | ||
| ) | ||
| (snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = { | ||
|
|
||
| @SuppressWarnings(Array("org.wartremover.warts.Var")) | ||
| var idx = 0 | ||
|
||
|
|
||
| Source | ||
| .unfoldAsync(Start: Position) { | ||
| case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_)) | ||
| case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_)) | ||
| case End => Future.successful(None) | ||
| } | ||
| .via(ZstdGroupedWeight(config.maxFileSize)) | ||
| // Add a buffer so that the next object continues accumulating while we write the previous one | ||
| .buffer( | ||
| 1, | ||
| OverflowStrategy.backpressure, | ||
| ) | ||
| .mapAsync(1) { zstdObj => | ||
| val objectKey = s"snapshot_$idx.zstd" | ||
| Future { | ||
| // TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole. | ||
| // Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly, | ||
| // i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the | ||
| // partially written object. | ||
| // TODO(#3429): Error handling | ||
| val _ = s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe())) | ||
| idx += 1 | ||
| } | ||
| } | ||
| .runWith(Sink.ignore) | ||
|
|
||
| }.map(_ => ()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| // Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package org.lfdecentralizedtrust.splice.scan.store.bulk | ||
|
|
||
| import com.digitalasset.canton.data.CantonTimestamp | ||
| import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} | ||
| import com.digitalasset.canton.tracing.TraceContext | ||
| import org.apache.pekko.stream.{Attributes, Outlet, SourceShape} | ||
| import org.apache.pekko.stream.stage.{AsyncCallback, GraphStage, GraphStageLogic, OutHandler} | ||
| import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore | ||
| import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.QueryAcsSnapshotResult | ||
| import org.lfdecentralizedtrust.splice.store.PageLimit | ||
| import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent | ||
|
|
||
| import java.util.concurrent.atomic.AtomicReference | ||
| import scala.concurrent.ExecutionContext | ||
| import scala.util.{Failure, Success} | ||
|
|
||
| case class AcsSnapshotSource( | ||
|
||
| acsSnapshotStore: AcsSnapshotStore, | ||
| timestamp: CantonTimestamp, | ||
| migrationId: Long, | ||
| override val loggerFactory: NamedLoggerFactory, | ||
| )(implicit tc: TraceContext, ec: ExecutionContext) | ||
| extends GraphStage[SourceShape[Vector[SpliceCreatedEvent]]] | ||
| with NamedLogging { | ||
| val out: Outlet[Vector[SpliceCreatedEvent]] = Outlet("AcsSnapshotSource") | ||
| override def shape: SourceShape[Vector[SpliceCreatedEvent]] = SourceShape(out) | ||
|
|
||
| val numUpdatesPerQuery = 1000 | ||
|
|
||
| override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { | ||
| new GraphStageLogic(shape) with OutHandler { | ||
| val token = new AtomicReference[Option[Long]](None) | ||
|
|
||
| val asyncCallback: AsyncCallback[QueryAcsSnapshotResult] = getAsyncCallback { | ||
| case result: QueryAcsSnapshotResult => | ||
| if (result.createdEventsInPage.isEmpty) { | ||
| complete(out) | ||
| token.set(None) | ||
| } else { | ||
| push(out, result.createdEventsInPage) | ||
| token.set(result.afterToken) | ||
| } | ||
| case _ => | ||
| logger.error("asyncCallback unexpectedly called with an error") | ||
| } | ||
|
|
||
| val failureCallback: AsyncCallback[Throwable] = getAsyncCallback { ex => | ||
| fail(out, ex) | ||
| } | ||
|
|
||
| override def onPull(): Unit = { | ||
| acsSnapshotStore | ||
| .queryAcsSnapshot( | ||
| migrationId, | ||
| timestamp, | ||
| token.get(), | ||
| PageLimit.tryCreate(numUpdatesPerQuery), | ||
| Seq.empty, | ||
| Seq.empty, | ||
| ) | ||
| .onComplete { | ||
| case Success(value) => asyncCallback.invoke(value) | ||
| case Failure(exception) => failureCallback.invoke(exception) | ||
| } | ||
| } | ||
| setHandler(out, this) | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,68 @@ | ||||||
| // Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||||||
| // SPDX-License-Identifier: Apache-2.0 | ||||||
|
|
||||||
| package org.lfdecentralizedtrust.splice.scan.store.bulk | ||||||
|
|
||||||
| import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} | ||||||
| import com.digitalasset.canton.tracing.TraceContext | ||||||
| import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} | ||||||
| import software.amazon.awssdk.core.sync.RequestBody | ||||||
| import software.amazon.awssdk.regions.Region | ||||||
| import software.amazon.awssdk.services.s3.model.{GetObjectRequest, PutObjectRequest} | ||||||
| import software.amazon.awssdk.services.s3.{S3Client, S3Configuration} | ||||||
|
|
||||||
| import java.net.URI | ||||||
| import java.nio.ByteBuffer | ||||||
|
|
||||||
| case class S3Config( | ||||||
| endpoint: URI, | ||||||
| bucketName: String, | ||||||
| region: Region, | ||||||
| credentials: AwsBasicCredentials, | ||||||
| ) | ||||||
|
|
||||||
| class S3BucketConnection( | ||||||
| val s3Client: S3Client, | ||||||
| val bucketName: String, | ||||||
| val loggerFactory: NamedLoggerFactory, | ||||||
| ) extends NamedLogging { | ||||||
| // Reads the full content of an s3 object into a ByteBuffer. | ||||||
| // Use only for testing, when the object size is known to be small | ||||||
ray-roestenburg-da marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| def readFullObject(key: String): ByteBuffer = { | ||||||
| val obj = s3Client.getObject(GetObjectRequest.builder().bucket(bucketName).key(key).build()) | ||||||
| val bytes = obj.readAllBytes() | ||||||
| val ret = ByteBuffer.allocateDirect(bytes.length) | ||||||
|
||||||
| ret.put(bytes) | ||||||
| } | ||||||
|
|
||||||
| // Writes a full object from memory into an s3 object | ||||||
| def writeFullObject(key: String, content: ByteBuffer)(implicit tc: TraceContext) = { | ||||||
|
||||||
| logger.debug(s"Writing ${content.array().length} bytes to S3 object $key") | ||||||
| val putObj: PutObjectRequest = PutObjectRequest | ||||||
| .builder() | ||||||
| .bucket(bucketName) | ||||||
| .key(key) | ||||||
| .build() | ||||||
| s3Client.putObject( | ||||||
|
||||||
| putObj, | ||||||
| RequestBody.fromBytes(content.array()), | ||||||
| ) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| object S3BucketConnection { | ||||||
| def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory) = { | ||||||
|
||||||
| def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory) = { | |
| def apply(s3Config: S3Config, bucketName: String, loggerFactory: NamedLoggerFactory): S3BucketConnection = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if some other S3 compatible storage does not work with this, could investigate that, or wait until someone asks for it. In any case it should be defaulted to this, if we would make it configurable, and then you would need to explicitly turn it off, IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that's what I meant (check if it's needed), and that's the plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rather do // TODO (#767): make configurable on Limit.MaxPage so you don't need this?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair. I half-did #767 by making it an argument, but not fully configurable (e.g. by an app config, so this tackles only the use case of overriding the max per usage in code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks