-
Notifications
You must be signed in to change notification settings - Fork 53
[do not merge] Local performance test of ingestion #3292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| package org.lfdecentralizedtrust.splice.performance | ||
|
|
||
| import cats.data.Chain | ||
| import com.digitalasset.canton.config.NonNegativeDuration | ||
| import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} | ||
| import org.apache.pekko.actor.ActorSystem | ||
| import org.lfdecentralizedtrust.splice.admin.api.client.commands.HttpClientBuilder | ||
| import org.lfdecentralizedtrust.splice.http.HttpClient | ||
| import org.lfdecentralizedtrust.splice.http.v0.definitions.UpdateHistoryItemV2.members | ||
| import org.lfdecentralizedtrust.splice.http.v0.definitions.{ | ||
| UpdateHistoryItemV2, | ||
| UpdateHistoryRequestAfter, | ||
| UpdateHistoryRequestV2, | ||
| UpdateHistoryResponseV2, | ||
| } | ||
| import org.lfdecentralizedtrust.splice.http.v0.scan as http | ||
|
|
||
| import java.nio.charset.StandardCharsets | ||
| import java.nio.file.{Files, Paths} | ||
| import java.time.Instant | ||
| import java.time.temporal.ChronoUnit | ||
| import scala.annotation.tailrec | ||
| import scala.concurrent.duration.* | ||
| import scala.concurrent.{Await, ExecutionContextExecutor} | ||
|
|
||
| object DownloadTxMainnet extends App with NamedLogging { | ||
| override protected def loggerFactory: NamedLoggerFactory = NamedLoggerFactory.root | ||
|
|
||
| implicit val actorSystem: ActorSystem = ActorSystem() | ||
|
|
||
| try { | ||
| implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global | ||
| implicit val httpClient: HttpClient = | ||
| HttpClient(HttpClient.HttpRequestParameters(NonNegativeDuration.ofSeconds(40L)), logger) | ||
| val host = "https://scan.sv-2.global.canton.network.digitalasset.com" | ||
|
|
||
| val migrationId = 3 // TODO: figure out how to configure this | ||
| val client = http.ScanClient.httpClient(HttpClientBuilder().buildClient(), host) | ||
|
|
||
| val now = Instant.now() | ||
| val start = now.minus(1, ChronoUnit.HOURS) | ||
|
|
||
| def query(at: Instant) = { | ||
| println(s"Querying at $at") | ||
| Await | ||
| .result( | ||
| client | ||
| .getUpdateHistoryV2( | ||
| UpdateHistoryRequestV2( | ||
| after = Some(UpdateHistoryRequestAfter(migrationId.toLong, at.toString)), | ||
| pageSize = 1000, | ||
| ) | ||
| ) | ||
| .value, | ||
| 1.minute, | ||
| ) | ||
| .getOrElse(throw new RuntimeException()) | ||
| .fold(identity, _ => throw new RuntimeException(), _ => throw new RuntimeException()) | ||
| } | ||
| @tailrec | ||
| def loop(at: Instant, acc: Chain[UpdateHistoryItemV2]): Chain[UpdateHistoryItemV2] = { | ||
| val nextResponse = query(at) | ||
| nextResponse.transactions.lastOption match { | ||
| case Some(value) => | ||
| val recordTime = Instant.parse(value match { | ||
| case members.UpdateHistoryTransactionV2(value) => value.recordTime | ||
| case members.UpdateHistoryReassignment(value) => value.recordTime | ||
| }) | ||
| if (recordTime.isAfter(now)) acc | ||
| else | ||
| loop( | ||
| recordTime, | ||
| acc ++ Chain.fromSeq(nextResponse.transactions), | ||
| ) | ||
| case None => | ||
| acc | ||
| } | ||
| } | ||
|
|
||
| val txsInDuration = loop(start, Chain.empty) | ||
| val toWrite = UpdateHistoryResponseV2.encodeUpdateHistoryResponseV2( | ||
| UpdateHistoryResponseV2(txsInDuration.toVector) | ||
| ) | ||
| val path = Paths.get("update_history_response.json") | ||
| Files.write(path, toWrite.noSpaces.getBytes(StandardCharsets.UTF_8)) | ||
| } finally { | ||
| Await.result(actorSystem.terminate(), 10.seconds) | ||
| } | ||
|
|
||
| } |
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ignore this file for now |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| package org.lfdecentralizedtrust.splice.performance | ||
|
|
||
| import com.digitalasset.canton.LogReporter | ||
| import com.typesafe.config.{Config, ConfigFactory} | ||
| import org.lfdecentralizedtrust.splice.performance.tests.DbSvDsoStoreIngestionPerformanceTest | ||
| import org.scalatest.{Args, Suite} | ||
|
|
||
| object PerformanceRunner extends App { | ||
|
|
||
| // Set up logging: only WARN and ERROR to stdout | ||
| // private val rootLogger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) | ||
| // (Assume logging config is handled via HOCON or logback config file) | ||
|
|
||
| // Parse CLI arguments | ||
| case class CliArgs(command: String, testName: String, configPath: String) | ||
| def parseArgs(args: Array[String]): CliArgs = { | ||
| // Simple parser for: splice-perf run <test-name> -C <config> | ||
| if (args.length < 4 || args(0) != "run" || args(2) != "-C") | ||
| throw new IllegalArgumentException("Usage: splice-perf run <test-name> -C <config>") | ||
| CliArgs(args(0), args(1), args(3)) | ||
| } | ||
|
|
||
| val cliArgs = parseArgs(args) | ||
| val config: Config = ConfigFactory.parseFile(new java.io.File(cliArgs.configPath)).resolve() | ||
| val allTests: Map[String, () => Suite] = Map( | ||
| "DbSvDsoStore" -> (() => new DbSvDsoStoreIngestionPerformanceTest()) | ||
| ) | ||
|
|
||
| // Globbing support for test names | ||
| def getTestNames(pattern: String) = { | ||
| if (pattern == "*") allTests.toSeq | ||
| else allTests.filter(_._1.matches(pattern)).toSeq | ||
| } | ||
|
|
||
| val testsToRun = getTestNames(cliArgs.testName) | ||
|
|
||
| // Run tests and report summary | ||
| testsToRun.foreach { case (testName, testBuilder) => | ||
| val test = testBuilder() | ||
| try { | ||
| test.run(None, Args(reporter = new LogReporter())) | ||
| println(s"Test '$testName' PASSED") | ||
| } catch { | ||
| case ex: Throwable => | ||
| println(s"Test '$testName' FAILED: ${ex.getMessage}") | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| package org.lfdecentralizedtrust.splice.performance.tests | ||
|
|
||
| import cats.data.NonEmptyList | ||
| import com.daml.metrics.api.noop.NoOpMetricsFactory | ||
| import com.digitalasset.canton.concurrent.FutureSupervisor | ||
| import com.digitalasset.canton.lifecycle.FutureUnlessShutdown | ||
| import com.digitalasset.canton.resource.DbStorage | ||
| import com.digitalasset.canton.tracing.TraceContext | ||
| import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} | ||
| import org.apache.pekko.Done | ||
| import org.apache.pekko.stream.scaladsl.{Sink, Source} | ||
| import org.lfdecentralizedtrust.splice.config.IngestionConfig | ||
| import org.lfdecentralizedtrust.splice.environment.ledger.api.TreeUpdateOrOffsetCheckpoint | ||
| import org.lfdecentralizedtrust.splice.environment.{DarResources, RetryProvider} | ||
| import org.lfdecentralizedtrust.splice.http.v0.definitions.{ | ||
| UpdateHistoryItemV2, | ||
| UpdateHistoryResponseV2, | ||
| } | ||
| import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo | ||
| import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings | ||
| import org.lfdecentralizedtrust.splice.store.StoreTest | ||
| import org.lfdecentralizedtrust.splice.store.db.SplicePostgresTest | ||
| import org.lfdecentralizedtrust.splice.sv.store.SvStore | ||
| import org.lfdecentralizedtrust.splice.sv.store.db.DbSvDsoStore | ||
| import org.lfdecentralizedtrust.splice.util.{ResourceTemplateDecoder, TemplateJsonDecoder} | ||
| import org.scalatest.concurrent.PatienceConfiguration | ||
|
|
||
| import java.nio.file.{Files, Paths} | ||
| import scala.collection.mutable | ||
| import scala.concurrent.duration.FiniteDuration | ||
|
|
||
| class DbSvDsoStoreIngestionPerformanceTest | ||
| extends StoreTest | ||
| with SplicePostgresTest | ||
| with HasActorSystem | ||
| with HasExecutionContext { | ||
|
|
||
| "Ingestion performance test" in { | ||
| // read csv file into pekko stream | ||
|
|
||
| val store = mkStore() | ||
| store.multiDomainAcsStore.ingestionSink.initialize().futureValue | ||
| val timings = mutable.ListBuffer[Long]() | ||
| val ingestionConfig = IngestionConfig() | ||
| val dumpFile = Paths.get("/home/oriolmunoz/mainnetdump/update_history_response.json") | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this dump is 1h of mainnet updates (see next file for how it was obtained).
|
||
| val dump = (for { | ||
| json <- io.circe.parser | ||
| .parse( | ||
| Files | ||
| .readString(dumpFile) | ||
| .replace( // TODO: maybe just use the store with dsoParty=mainnet's | ||
| "DSO::1220b1431ef217342db44d516bb9befde802be7d8899637d290895fa58880f19accc", | ||
| dsoParty.toProtoPrimitive, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. every test uses its own dsoParty... see TODO |
||
| ) | ||
| ) | ||
| decoded <- UpdateHistoryResponseV2.decodeUpdateHistoryResponseV2.decodeJson(json) | ||
| } yield decoded) | ||
| .getOrElse( | ||
| throw new IllegalArgumentException( | ||
| "Failed to parse the update history dump provided. It should have the structure of UpdateHistoryResponseV2." | ||
| ) | ||
| ) | ||
| val txs = dump.transactions.collect { | ||
| // deliberately ignoring reassignments | ||
| case UpdateHistoryItemV2.members.UpdateHistoryTransactionV2(update) => | ||
| CompactJsonScanHttpEncodings.httpToLapiUpdate(update) | ||
| } | ||
|
|
||
| Source | ||
| .fromIterator(() => txs.iterator) | ||
| .batch(ingestionConfig.maxBatchSize.toLong, Vector(_))(_ :+ _) | ||
| .zipWithIndex | ||
| .runWith(Sink.foreachAsync(parallelism = 1) { case (batch, index) => | ||
| println(s"Ingesting batch $index of ${batch.length} elements") | ||
| val before = System.nanoTime() | ||
| store.multiDomainAcsStore.ingestionSink | ||
| .ingestUpdateBatch( | ||
| NonEmptyList.fromListUnsafe( | ||
| txs | ||
|
||
| .map(tx => | ||
| TreeUpdateOrOffsetCheckpoint.Update(tx.update.update, tx.update.synchronizerId) | ||
OriolMunoz-da marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
| .toList | ||
| ) | ||
| ) | ||
| .map { _ => | ||
| val after = System.nanoTime() | ||
| val duration = after - before | ||
| timings ++= Seq.fill(batch.length)(duration / batch.length) | ||
| val avg = timings.sum.toDouble / timings.size | ||
| println( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this wait for the output to be flushed? If yes, it could add significant overhead, consider only printing at the end or only once every N iterations. |
||
| f"Ingested batch $index (${batch.length} elements) in $duration ns, average per-item time: $avg%.2f ns over ${timings.size} records, total time: ${timings.sum} ns" | ||
| ) | ||
| } | ||
| }) | ||
| .futureValue(timeout = PatienceConfiguration.Timeout(FiniteDuration(12, "hours"))) should be( | ||
| Done | ||
| ) | ||
|
|
||
| import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton | ||
| storage | ||
| .querySingle( | ||
| sql"select count(*) from dso_acs_store".as[Int].headOption, | ||
| "count", | ||
| ) | ||
| .value | ||
| .failOnShutdown("") | ||
| .futureValue | ||
| .valueOrFail("count is there") should be >= 30_312 | ||
| } | ||
|
|
||
| private val storeSvParty = providerParty(42) | ||
| def mkStore() = { | ||
| val packageSignatures = | ||
| ResourceTemplateDecoder.loadPackageSignaturesFromResources( | ||
| DarResources.amulet.all ++ | ||
| DarResources.validatorLifecycle.all ++ | ||
| DarResources.dsoGovernance.all | ||
| ) | ||
| implicit val templateJsonDecoder: TemplateJsonDecoder = | ||
| new ResourceTemplateDecoder(packageSignatures, loggerFactory) | ||
| new DbSvDsoStore( | ||
| SvStore.Key(storeSvParty, dsoParty), | ||
| storage, | ||
| loggerFactory, | ||
| RetryProvider(loggerFactory, timeouts, FutureSupervisor.Noop, NoOpMetricsFactory), | ||
| DomainMigrationInfo( | ||
| domainMigrationId, | ||
| None, | ||
| ), | ||
| participantId = mkParticipantId("IngestionPerformanceIngestionTest"), | ||
| IngestionConfig(), | ||
| )(parallelExecutionContext, templateJsonDecoder, closeContext) | ||
| } | ||
|
|
||
| override protected def cleanDb( | ||
| storage: DbStorage | ||
| )(implicit traceContext: TraceContext): FutureUnlessShutdown[?] = { | ||
| for { | ||
| _ <- resetAllAppTables(storage) | ||
| } yield () | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
downloads 1h worth of mainnet updates and dumps it into a file, to be loaded in the test above