Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

downloads 1h worth of mainnet updates and dumps it into a file, to be loaded in the test above

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.lfdecentralizedtrust.splice.performance

import cats.data.Chain
import com.digitalasset.canton.config.NonNegativeDuration
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import org.apache.pekko.actor.ActorSystem
import org.lfdecentralizedtrust.splice.admin.api.client.commands.HttpClientBuilder
import org.lfdecentralizedtrust.splice.http.HttpClient
import org.lfdecentralizedtrust.splice.http.v0.definitions.UpdateHistoryItemV2.members
import org.lfdecentralizedtrust.splice.http.v0.definitions.{
UpdateHistoryItemV2,
UpdateHistoryRequestAfter,
UpdateHistoryRequestV2,
UpdateHistoryResponseV2,
}
import org.lfdecentralizedtrust.splice.http.v0.scan as http

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.time.Instant
import java.time.temporal.ChronoUnit
import scala.annotation.tailrec
import scala.concurrent.duration.*
import scala.concurrent.{Await, ExecutionContextExecutor}

object DownloadTxMainnet extends App with NamedLogging {
override protected def loggerFactory: NamedLoggerFactory = NamedLoggerFactory.root

implicit val actorSystem: ActorSystem = ActorSystem()

try {
implicit val ec: ExecutionContextExecutor = scala.concurrent.ExecutionContext.global
implicit val httpClient: HttpClient =
HttpClient(HttpClient.HttpRequestParameters(NonNegativeDuration.ofSeconds(40L)), logger)
val host = "https://scan.sv-2.global.canton.network.digitalasset.com"

val migrationId = 3 // TODO: figure out how to configure this
val client = http.ScanClient.httpClient(HttpClientBuilder().buildClient(), host)

val now = Instant.now()
val start = now.minus(1, ChronoUnit.HOURS)

def query(at: Instant) = {
println(s"Querying at $at")
Await
.result(
client
.getUpdateHistoryV2(
UpdateHistoryRequestV2(
after = Some(UpdateHistoryRequestAfter(migrationId.toLong, at.toString)),
pageSize = 1000,
)
)
.value,
1.minute,
)
.getOrElse(throw new RuntimeException())
.fold(identity, _ => throw new RuntimeException(), _ => throw new RuntimeException())
}
@tailrec
def loop(at: Instant, acc: Chain[UpdateHistoryItemV2]): Chain[UpdateHistoryItemV2] = {
val nextResponse = query(at)
nextResponse.transactions.lastOption match {
case Some(value) =>
val recordTime = Instant.parse(value match {
case members.UpdateHistoryTransactionV2(value) => value.recordTime
case members.UpdateHistoryReassignment(value) => value.recordTime
})
if (recordTime.isAfter(now)) acc
else
loop(
recordTime,
acc ++ Chain.fromSeq(nextResponse.transactions),
)
case None =>
acc
}
}

val txsInDuration = loop(start, Chain.empty)
val toWrite = UpdateHistoryResponseV2.encodeUpdateHistoryResponseV2(
UpdateHistoryResponseV2(txsInDuration.toVector)
)
val path = Paths.get("update_history_response.json")
Files.write(path, toWrite.noSpaces.getBytes(StandardCharsets.UTF_8))
} finally {
Await.result(actorSystem.terminate(), 10.seconds)
}

}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this file for now

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.lfdecentralizedtrust.splice.performance

import com.digitalasset.canton.LogReporter
import com.typesafe.config.{Config, ConfigFactory}
import org.lfdecentralizedtrust.splice.performance.tests.DbSvDsoStoreIngestionPerformanceTest
import org.scalatest.{Args, Suite}

object PerformanceRunner extends App {

// Set up logging: only WARN and ERROR to stdout
// private val rootLogger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME)
// (Assume logging config is handled via HOCON or logback config file)

// Parse CLI arguments
case class CliArgs(command: String, testName: String, configPath: String)
def parseArgs(args: Array[String]): CliArgs = {
// Simple parser for: splice-perf run <test-name> -C <config>
if (args.length < 4 || args(0) != "run" || args(2) != "-C")
throw new IllegalArgumentException("Usage: splice-perf run <test-name> -C <config>")
CliArgs(args(0), args(1), args(3))
}

val cliArgs = parseArgs(args)
val config: Config = ConfigFactory.parseFile(new java.io.File(cliArgs.configPath)).resolve()
val allTests: Map[String, () => Suite] = Map(
"DbSvDsoStore" -> (() => new DbSvDsoStoreIngestionPerformanceTest())
)

// Globbing support for test names
def getTestNames(pattern: String) = {
if (pattern == "*") allTests.toSeq
else allTests.filter(_._1.matches(pattern)).toSeq
}

val testsToRun = getTestNames(cliArgs.testName)

// Run tests and report summary
testsToRun.foreach { case (testName, testBuilder) =>
val test = testBuilder()
try {
test.run(None, Args(reporter = new LogReporter()))
println(s"Test '$testName' PASSED")
} catch {
case ex: Throwable =>
println(s"Test '$testName' FAILED: ${ex.getMessage}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.lfdecentralizedtrust.splice.performance.tests

import cats.data.NonEmptyList
import com.daml.metrics.api.noop.NoOpMetricsFactory
import com.digitalasset.canton.concurrent.FutureSupervisor
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.{HasActorSystem, HasExecutionContext}
import org.apache.pekko.Done
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.lfdecentralizedtrust.splice.config.IngestionConfig
import org.lfdecentralizedtrust.splice.environment.ledger.api.TreeUpdateOrOffsetCheckpoint
import org.lfdecentralizedtrust.splice.environment.{DarResources, RetryProvider}
import org.lfdecentralizedtrust.splice.http.v0.definitions.{
UpdateHistoryItemV2,
UpdateHistoryResponseV2,
}
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
import org.lfdecentralizedtrust.splice.store.StoreTest
import org.lfdecentralizedtrust.splice.store.db.SplicePostgresTest
import org.lfdecentralizedtrust.splice.sv.store.SvStore
import org.lfdecentralizedtrust.splice.sv.store.db.DbSvDsoStore
import org.lfdecentralizedtrust.splice.util.{ResourceTemplateDecoder, TemplateJsonDecoder}
import org.scalatest.concurrent.PatienceConfiguration

import java.nio.file.{Files, Paths}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

class DbSvDsoStoreIngestionPerformanceTest
extends StoreTest
with SplicePostgresTest
with HasActorSystem
with HasExecutionContext {

"Ingestion performance test" in {
// read csv file into pekko stream

val store = mkStore()
store.multiDomainAcsStore.ingestionSink.initialize().futureValue
val timings = mutable.ListBuffer[Long]()
val ingestionConfig = IngestionConfig()
val dumpFile = Paths.get("/home/oriolmunoz/mainnetdump/update_history_response.json")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this dump is 1h of mainnet updates (see next file for how it was obtained).
It should be okay, because:

  • Any (filtered-in) create causes an insert
  • Any archive causes a delete, so as long as the template is included in the filter
  • Txlog doens't care about whether the contract ids involved are present in the ACS store or not

val dump = (for {
json <- io.circe.parser
.parse(
Files
.readString(dumpFile)
.replace( // TODO: maybe just use the store with dsoParty=mainnet's
"DSO::1220b1431ef217342db44d516bb9befde802be7d8899637d290895fa58880f19accc",
dsoParty.toProtoPrimitive,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every test uses its own dsoParty... see TODO

)
)
decoded <- UpdateHistoryResponseV2.decodeUpdateHistoryResponseV2.decodeJson(json)
} yield decoded)
.getOrElse(
throw new IllegalArgumentException(
"Failed to parse the update history dump provided. It should have the structure of UpdateHistoryResponseV2."
)
)
val txs = dump.transactions.collect {
// deliberately ignoring reassignments
case UpdateHistoryItemV2.members.UpdateHistoryTransactionV2(update) =>
CompactJsonScanHttpEncodings.httpToLapiUpdate(update)
}

Source
.fromIterator(() => txs.iterator)
.batch(ingestionConfig.maxBatchSize.toLong, Vector(_))(_ :+ _)
.zipWithIndex
.runWith(Sink.foreachAsync(parallelism = 1) { case (batch, index) =>
println(s"Ingesting batch $index of ${batch.length} elements")
val before = System.nanoTime()
store.multiDomainAcsStore.ingestionSink
.ingestUpdateBatch(
NonEmptyList.fromListUnsafe(
txs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this ingesting the whole txs list in each iteration? Shouldn't it use batch instead? I'm surprised nothing explodes when we reingest the same transactions over and over.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 I'm surprised my performance was not god-awful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was probably fine because the first iteration did a lot of work, and all other iterations were no-ops due to the last ingested offset guard.

.map(tx =>
TreeUpdateOrOffsetCheckpoint.Update(tx.update.update, tx.update.synchronizerId)
)
.toList
)
)
.map { _ =>
val after = System.nanoTime()
val duration = after - before
timings ++= Seq.fill(batch.length)(duration / batch.length)
val avg = timings.sum.toDouble / timings.size
println(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this wait for the output to be flushed? If yes, it could add significant overhead, consider only printing at the end or only once every N iterations.

f"Ingested batch $index (${batch.length} elements) in $duration ns, average per-item time: $avg%.2f ns over ${timings.size} records, total time: ${timings.sum} ns"
)
}
})
.futureValue(timeout = PatienceConfiguration.Timeout(FiniteDuration(12, "hours"))) should be(
Done
)

import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton
storage
.querySingle(
sql"select count(*) from dso_acs_store".as[Int].headOption,
"count",
)
.value
.failOnShutdown("")
.futureValue
.valueOrFail("count is there") should be >= 30_312
}

private val storeSvParty = providerParty(42)
def mkStore() = {
val packageSignatures =
ResourceTemplateDecoder.loadPackageSignaturesFromResources(
DarResources.amulet.all ++
DarResources.validatorLifecycle.all ++
DarResources.dsoGovernance.all
)
implicit val templateJsonDecoder: TemplateJsonDecoder =
new ResourceTemplateDecoder(packageSignatures, loggerFactory)
new DbSvDsoStore(
SvStore.Key(storeSvParty, dsoParty),
storage,
loggerFactory,
RetryProvider(loggerFactory, timeouts, FutureSupervisor.Noop, NoOpMetricsFactory),
DomainMigrationInfo(
domainMigrationId,
None,
),
participantId = mkParticipantId("IngestionPerformanceIngestionTest"),
IngestionConfig(),
)(parallelExecutionContext, templateJsonDecoder, closeContext)
}

override protected def cleanDb(
storage: DbStorage
)(implicit traceContext: TraceContext): FutureUnlessShutdown[?] = {
for {
_ <- resetAllAppTables(storage)
} yield ()
}
}
1 change: 1 addition & 0 deletions test-full-class-names-non-integration.log
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ org.lfdecentralizedtrust.splice.environment.CommandCircuitBreakerTest
org.lfdecentralizedtrust.splice.environment.CommandIdDedupTest
org.lfdecentralizedtrust.splice.environment.TopologyAwarePackageVersionSupportTest
org.lfdecentralizedtrust.splice.http.UrlValidatorTest
org.lfdecentralizedtrust.splice.performance.tests.DbSvDsoStoreIngestionPerformanceTest
org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnectionTest
org.lfdecentralizedtrust.splice.scan.admin.http.ScanHttpEncodingsTest
org.lfdecentralizedtrust.splice.scan.automation.AcsSnapshotTriggerTest
Expand Down
Loading