-
Notifications
You must be signed in to change notification settings - Fork 53
[ci] bulk-storage support for acs snapshots #3423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
d8dd90f
d2f9b76
7331008
f913382
c1f4c97
342aebe
bbf5af2
3744d31
3294ea7
2cd38ba
2f7ca2b
9a8b0cd
c06bd8f
0bd1419
2c84efe
8235c96
2c126c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| // Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package org.lfdecentralizedtrust.splice.scan.store.bulk | ||
|
|
||
| import scala.concurrent.ExecutionContext | ||
| import com.digitalasset.canton.data.CantonTimestamp | ||
| import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} | ||
| import com.digitalasset.canton.tracing.TraceContext | ||
| import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil} | ||
| import com.digitalasset.canton.util.PekkoUtil.RetrySourcePolicy | ||
| import org.apache.pekko.actor.ActorSystem | ||
| import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy} | ||
| import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source} | ||
| import org.apache.pekko.util.ByteString | ||
| import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings | ||
| import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore | ||
| import org.lfdecentralizedtrust.splice.store.HardLimit | ||
|
|
||
| import scala.concurrent.Future | ||
| import io.circe.syntax.* | ||
|
|
||
| import java.nio.ByteBuffer | ||
| import java.nio.charset.StandardCharsets | ||
| import java.util.concurrent.atomic.AtomicInteger | ||
| import scala.concurrent.duration.FiniteDuration | ||
|
|
||
| case class BulkStorageConfig( | ||
| dbReadChunkSize: Int, | ||
| maxFileSize: Long, | ||
| ) | ||
|
|
||
| object BulkStorageConfigs { | ||
| val bulkStorageConfigV1 = BulkStorageConfig( | ||
| 1000, | ||
| (64 * 1024 * 1024).toLong, | ||
| ) | ||
| } | ||
|
|
||
| sealed trait Position | ||
|
||
| case object Start extends Position | ||
| case object End extends Position | ||
| final case class Index(value: Long) extends Position | ||
|
|
||
| class AcsSnapshotBulkStorage( | ||
| val config: BulkStorageConfig, | ||
| val acsSnapshotStore: AcsSnapshotStore, | ||
| val s3Connection: S3BucketConnection, | ||
| override val loggerFactory: NamedLoggerFactory, | ||
| )(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext) | ||
| extends NamedLogging { | ||
|
|
||
| private def getAcsSnapshotChunk( | ||
| migrationId: Long, | ||
| timestamp: CantonTimestamp, | ||
| after: Option[Long], | ||
| ): Future[(Position, ByteString)] = { | ||
isegall-da marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| for { | ||
| snapshot <- acsSnapshotStore.queryAcsSnapshot( | ||
| migrationId, | ||
| snapshot = timestamp, | ||
| after, | ||
| limit = HardLimit.tryCreate(config.dbReadChunkSize), | ||
| Seq.empty, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, since you do limit = , maybe nice to do partyIds = Seq.empty, templates = Seq.empty for clarity
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed (removed the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes sense to default |
||
| Seq.empty, | ||
| ) | ||
| } yield { | ||
| val encoded = snapshot.createdEventsInPage.map(event => | ||
| CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event) | ||
| ) | ||
| val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n" | ||
| val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8)) | ||
| logger.debug( | ||
| s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes" | ||
| ) | ||
| (snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = { | ||
|
|
||
| // TODO(#3429): currently, if this crashes half-way through, there is no indication in the S3 objects that | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious, why not do that right now?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (or is this just a next PR that you'll work on, fine by me of course)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, just followup work in coming PRs, this one's already quite large. |
||
| // the snapshot is incomplete. We probably want to label the last object with `last` or something like that | ||
| // so that we can detect incomplete snapshots and recreate them. | ||
|
|
||
| def mksrc = { | ||
| val idx = new AtomicInteger(0) | ||
| val base = Source | ||
| .unfoldAsync(Start: Position) { | ||
| case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_)) | ||
| case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_)) | ||
| case End => Future.successful(None) | ||
| } | ||
| .via(ZstdGroupedWeight(config.maxFileSize)) | ||
| // Add a buffer so that the next object continues accumulating while we write the previous one | ||
| .buffer( | ||
| 1, | ||
| OverflowStrategy.backpressure, | ||
| ) | ||
| .mapAsync(1) { zstdObj => | ||
| val objectKey = s"snapshot_$idx.zstd" | ||
| Future { | ||
|
||
| // TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole. | ||
| // Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly, | ||
| // i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the | ||
| // partially written object. | ||
| // TODO(#3429): Error handling | ||
| val _ = | ||
| s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe())) | ||
| idx.addAndGet(1) | ||
| } | ||
| } | ||
| val withKs = base.viaMat(KillSwitches.single)(Keep.right) | ||
| withKs.watchTermination() { case (ks, done) => (ks: KillSwitch, done) } | ||
| } | ||
|
|
||
| // TODO(#3429): tweak the retry parameters here | ||
| val delay = FiniteDuration(5, "seconds") | ||
| val policy = new RetrySourcePolicy[Unit, Int] { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a test that retry works, that writeFullObject can overwrite?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tested it manually for now, added a TODO to add a unit test for it. |
||
| override def shouldRetry( | ||
| lastState: Unit, | ||
| lastEmittedElement: Option[Int], | ||
| lastFailure: Option[Throwable], | ||
| ): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = { | ||
| val prefixMsg = | ||
| s"RetrySourcePolicy for restart of AcsSnapshotBulkStorage with ${delay} delay:" | ||
| lastFailure match { | ||
| case Some(t) => | ||
| logger.info(s"$prefixMsg Last failure: ${ErrorUtil.messageWithStacktrace(t)}") | ||
| case None => | ||
| logger.debug(s"$prefixMsg No failure, normal restart.") | ||
| } | ||
| // Always retry (TODO(#3429): consider a max number of retries?) | ||
|
||
| Some(delay -> ()) | ||
| } | ||
| } | ||
|
|
||
| PekkoUtil | ||
| .restartSource( | ||
| name = "acs-snapshot-dump", | ||
| initial = (), | ||
| mkSource = (_: Unit) => mksrc, | ||
| policy = policy, | ||
| ) | ||
| .runWith(Sink.ignore) | ||
|
|
||
| }.map(_ => ()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| // Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package org.lfdecentralizedtrust.splice.scan.store.bulk | ||
|
|
||
| import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} | ||
| import com.digitalasset.canton.tracing.TraceContext | ||
| import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} | ||
| import software.amazon.awssdk.core.sync.RequestBody | ||
| import software.amazon.awssdk.regions.Region | ||
| import software.amazon.awssdk.services.s3.model.{GetObjectRequest, PutObjectRequest} | ||
| import software.amazon.awssdk.services.s3.{S3Client, S3Configuration} | ||
|
|
||
| import java.net.URI | ||
| import java.nio.ByteBuffer | ||
|
|
||
| case class S3Config( | ||
| endpoint: URI, | ||
| bucketName: String, | ||
| region: Region, | ||
| credentials: AwsBasicCredentials, | ||
| ) | ||
|
|
||
| class S3BucketConnection( | ||
| val s3Client: S3Client, | ||
| val bucketName: String, | ||
| val loggerFactory: NamedLoggerFactory, | ||
| ) extends NamedLogging { | ||
| // Reads the full content of an s3 object into a ByteBuffer. | ||
| // Use only for testing, when the object size is known to be small | ||
ray-roestenburg-da marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def readFullObject(key: String): ByteBuffer = { | ||
| val obj = s3Client.getObject(GetObjectRequest.builder().bucket(bucketName).key(key).build()) | ||
| val bytes = obj.readAllBytes() | ||
| val ret = ByteBuffer.allocateDirect(bytes.length) | ||
|
||
| ret.put(bytes) | ||
| } | ||
|
|
||
| // Writes a full object from memory into an s3 object (a blocking call) | ||
| def writeFullObject(key: String, content: ByteBuffer)(implicit tc: TraceContext): Unit = { | ||
| logger.debug(s"Writing ${content.array().length} bytes to S3 object $key") | ||
| val putObj: PutObjectRequest = PutObjectRequest | ||
| .builder() | ||
| .bucket(bucketName) | ||
| .key(key) | ||
| .build() | ||
| s3Client.putObject( | ||
|
||
| putObj, | ||
| RequestBody.fromBytes(content.array()), | ||
| ) | ||
| () | ||
| } | ||
| } | ||
|
|
||
| object S3BucketConnection { | ||
| def apply( | ||
| s3Config: S3Config, | ||
| bucketName: String, | ||
| loggerFactory: NamedLoggerFactory, | ||
| ): S3BucketConnection = { | ||
| new S3BucketConnection( | ||
| S3Client | ||
| .builder() | ||
| .endpointOverride(s3Config.endpoint) | ||
| .region(s3Config.region) | ||
| .credentialsProvider(StaticCredentialsProvider.create(s3Config.credentials)) | ||
| // TODO(#3429): mockS3 and GCS support only path style access. Do we need to make this configurable? | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only if some other S3 compatible storage does not work with this, could investigate that, or wait until someone asks for it. In any case it should be defaulted to this, if we would make it configurable, and then you would need to explicitly turn it off, IMHO.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, that's what I meant (check if it's needed), and that's the plan. |
||
| .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) | ||
| .build(), | ||
| bucketName, | ||
| loggerFactory, | ||
| ) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick but you can also do
64L * 1024 * 1024There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done