Skip to content

Commit

Permalink
Use plugin-dispatcher all the way, #866
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Apr 21, 2021
1 parent b7dd238 commit 5bf89b4
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import akka.persistence.cassandra.journal.TagWriters.TagWrite
import akka.persistence.cassandra.query.EventsByPersistenceIdStage.{ Extractors, TaggedPersistentRepr }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.OptionVal

import scala.concurrent._

import akka.stream.ActorAttributes

trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatements {
this: CassandraJournal =>

Expand Down Expand Up @@ -74,9 +75,17 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen
dispatcher = sessionSettings.pluginDispatcher)
.mapAsync(1)(sendMissingTagWrite(tp, tagWrites.get))
}))
.map(te => queries.mapEvent(te.pr))
.runForeach(replayCallback)
.map(_ => ())
.map { te =>
println(s"# asyncReplayMessages mapEvent ${Thread.currentThread().getName}") // FIXME
queries.mapEvent(te.pr)
}
.map { p =>
println(s"# asyncReplayMessages replayCallback ${Thread.currentThread().getName}") // FIXME
replayCallback(p)
}
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.runWith(Sink.ignore)
.map(_ => ())(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)

} else {
queries
Expand All @@ -93,9 +102,17 @@ trait CassandraRecovery extends CassandraTagRecovery with TaggedPreparedStatemen
extractor = Extractors.persistentRepr(eventDeserializer, serialization),
// run the query on the journal dispatcher (not the queries dispatcher)
dispatcher = sessionSettings.pluginDispatcher)
.map(p => queries.mapEvent(p.persistentRepr))
.runForeach(replayCallback)
.map(_ => ())
.map { p =>
println(s"# asyncReplayMessages mapEvent ${Thread.currentThread().getName}") // FIXME
queries.mapEvent(p.persistentRepr)
}
.map { p =>
println(s"# asyncReplayMessages replayCallback ${Thread.currentThread().getName}") // FIXME
replayCallback(p)
}
.withAttributes(ActorAttributes.dispatcher(sessionSettings.pluginDispatcher))
.runWith(Sink.ignore)
.map(_ => ())(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ import akka.util.OptionVal
executeStatement(selectDeletedToQuery.bind(persistenceId)).map(r =>
Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0))

private def executeStatement(statement: Statement)(implicit ec: ExecutionContext): Future[ResultSet] =
private def executeStatement(statement: Statement)(implicit ec: ExecutionContext): Future[ResultSet] = {
println(s"# EventsByPersistenceIdStage: ${Thread.currentThread().getName}") // FIXME
session.executeAsync(withCustom(statement)).asScala
}

private def withCustom(statement: Statement): Statement = {
customConsistencyLevel.foreach(statement.setConsistencyLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
fastForwardEnabled: Boolean = false,
dispatcher: String): Source[T, Future[EventsByPersistenceIdStage.Control]] = {

println(s"# eventsByPersistenceId call on ${Thread.currentThread().getName}") // FIXME

val deserializeEventAsync = queryPluginConfig.deserializationParallelism > 1

createFutureSource(combinedEventsByPersistenceIdStmts) { (s, c) =>
Expand All @@ -642,6 +644,7 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
fastForwardEnabled))
.named(name)
}.mapAsync(queryPluginConfig.deserializationParallelism) { row =>
println(s"# eventsByPersistenceId deserialization mapAsync ${Thread.currentThread().getName}") // FIXME
extractor.extract(row, deserializeEventAsync)
}
.withAttributes(ActorAttributes.dispatcher(dispatcher))
Expand Down

0 comments on commit 5bf89b4

Please sign in to comment.