Skip to content

Commit

Permalink
use separate idle counter
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 28, 2024
1 parent e754762 commit 1d3fb3d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import org.slf4j.Logger
startTimestamp = Instant.EPOCH,
startWallClock = Instant.EPOCH,
currentQueryWallClock = Instant.EPOCH,
previousQueryWallClock = Instant.EPOCH)
previousQueryWallClock = Instant.EPOCH,
idleCountBeforeHeartbeat = 0)
}

final case class QueryState(
Expand All @@ -60,7 +61,8 @@ import org.slf4j.Logger
startTimestamp: Instant,
startWallClock: Instant,
currentQueryWallClock: Instant,
previousQueryWallClock: Instant) {
previousQueryWallClock: Instant,
idleCountBeforeHeartbeat: Long) {

def backtracking: Boolean = backtrackingCount > 0

Expand Down Expand Up @@ -301,6 +303,10 @@ import org.slf4j.Logger

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.itemCount == 0 && !state.backtracking) state.idleCount + 1 else 0
val newIdleCountBeforeHeartbeat =
if (state.backtracking) state.idleCountBeforeHeartbeat
else if (state.itemCount == 0) state.idleCountBeforeHeartbeat + 1
else 0
// start tracking query wall clock for heartbeats after initial backtracking query
val newQueryWallClock =
if (state.latestBacktracking != TimestampOffset.Zero) clock.instant()
Expand All @@ -324,7 +330,8 @@ import org.slf4j.Logger
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
} else if (switchFromBacktracking(state)) {
// switching from backtracking
state.copy(
Expand All @@ -334,7 +341,8 @@ import org.slf4j.Logger
idleCount = newIdleCount,
backtrackingCount = 0,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
} else {
// continuing
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
Expand All @@ -346,7 +354,8 @@ import org.slf4j.Logger
backtrackingCount = newBacktrackingCount,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
}

val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow)
Expand Down Expand Up @@ -399,7 +408,7 @@ import org.slf4j.Logger
}

def heartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) {
if (state.idleCountBeforeHeartbeat >= 3 && state.previousQueryWallClock != Instant.EPOCH) {
// use wall clock to measure duration since start, up to idle backtracking limit
val timestamp = state.startTimestamp.plus(
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,16 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
env.offset match {
case t: TimestampOffset =>
if (EnvelopeOrigin.fromQuery(env)) {
latestBacktrackingPerSlice.get(slice) match {
case Some(latestBacktracking) if latestBacktracking.isAfter(t.timestamp) && log.isInfoEnabled =>
val errStr = s"event from query for persistenceId ${env.persistenceId} seqNr ${env.sequenceNr} " +
s"timestamp ${t.timestamp} was before last event from backtracking or heartbeat $latestBacktracking."
log.info(errStr)

case _ => () // do nothing
}
// FIXME this can probably be changed to debug level or removed completely
val l = latestBacktracking(slice)
if (l.isAfter(t.timestamp) && log.isInfoEnabled)
log.info(
"event from query for persistenceId [{}] seqNr [{}] " +
s"timestamp [{}] was before last event from backtracking or heartbeat [{}].",
env.persistenceId,
env.sequenceNr,
t.timestamp,
l)

env :: Nil
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object EventsBySlicePubSubBacktrackingSpec {
akka.persistence.dynamodb {
journal.publish-events = on
query {
refresh-interval = 1 s
refresh-interval = 300 ms
# Ensure pubsub arrives first
behind-current-time = 2s
Expand Down

0 comments on commit 1d3fb3d

Please sign in to comment.