Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/fix: skip backtracking when far behind, accept pubsub events after idle #96

Merged
merged 4 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.dynamodb.internal

import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }

Expand All @@ -26,7 +27,22 @@ import org.slf4j.Logger

object QueryState {
val empty: QueryState =
QueryState(TimestampOffset.Zero, 0, 0, 0, 0, backtrackingCount = 0, TimestampOffset.Zero, 0, 0)
QueryState(
latest = TimestampOffset.Zero,
itemCount = 0,
itemCountSinceBacktracking = 0,
queryCount = 0,
idleCount = 0,
backtrackingCount = 0,
latestBacktracking = TimestampOffset.Zero,
latestBacktrackingSeenCount = 0,
backtrackingExpectFiltered = 0,
previous = TimestampOffset.Zero,
previousBacktracking = TimestampOffset.Zero,
startTimestamp = Instant.EPOCH,
startWallClock = Instant.EPOCH,
currentQueryWallClock = Instant.EPOCH,
previousQueryWallClock = Instant.EPOCH)
}

final case class QueryState(
Expand All @@ -38,17 +54,26 @@ import org.slf4j.Logger
backtrackingCount: Int,
latestBacktracking: TimestampOffset,
latestBacktrackingSeenCount: Int,
backtrackingExpectFiltered: Int) {
backtrackingExpectFiltered: Int,
previous: TimestampOffset,
previousBacktracking: TimestampOffset,
startTimestamp: Instant,
startWallClock: Instant,
currentQueryWallClock: Instant,
previousQueryWallClock: Instant) {

def backtracking: Boolean = backtrackingCount > 0

def currentOffset: TimestampOffset =
if (backtracking) latestBacktracking
else latest

def nextQueryFromTimestamp: Instant =
if (backtracking) latestBacktracking.timestamp
else latest.timestamp
def nextQueryFromTimestamp(backtrackingWindow: JDuration): Instant =
if (backtracking) {
if (latest.timestamp.minus(backtrackingWindow).isAfter(latestBacktracking.timestamp))
latest.timestamp.minus(backtrackingWindow)
else latestBacktracking.timestamp
} else latest.timestamp

def nextQueryToTimestamp: Option[Instant] = {
if (backtracking) Some(latest.timestamp)
Expand Down Expand Up @@ -81,15 +106,18 @@ import org.slf4j.Logger
dao: BySliceQuery.Dao[Item],
createEnvelope: (TimestampOffset, Item) => Envelope,
extractOffset: Envelope => TimestampOffset,
createHeartbeat: Instant => Option[Envelope],
clock: Clock,
settings: DynamoDBSettings,
log: Logger)(implicit val ec: ExecutionContext) {
import BySliceQuery._
import TimestampOffset.toTimestampOffset

private val backtrackingWindow = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)
private val halfBacktrackingWindow = backtrackingWindow.dividedBy(2)
private val firstBacktrackingQueryWindow =
backtrackingWindow.plus(JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis))
private val backtrackingBehindCurrentTime =
JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis)
private val firstBacktrackingQueryWindow = backtrackingWindow.plus(backtrackingBehindCurrentTime)

def currentBySlice(
logPrefix: String,
Expand All @@ -99,15 +127,17 @@ import org.slf4j.Logger
filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)

def nextOffset(state: QueryState, envelope: Envelope): QueryState =
state.copy(latest = extractOffset(envelope), itemCount = state.itemCount + 1)
def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
if (EnvelopeOrigin.isHeartbeatEvent(envelope)) state
else state.copy(latest = extractOffset(envelope), itemCount = state.itemCount + 1)
}

def nextQuery(state: QueryState, endTimestamp: Instant): (QueryState, Option[Source[Envelope, NotUsed]]) = {
// Note that we can't know how many events with the same timestamp that are filtered out
// so continue until itemCount is 0. That means an extra query at the end to make sure there are no
// more to fetch.
if (state.queryCount == 0L || state.itemCount > 0) {
val newState = state.copy(itemCount = 0, queryCount = state.queryCount + 1)
val newState = state.copy(itemCount = 0, queryCount = state.queryCount + 1, previous = state.latest)

val toTimestamp = newState.nextQueryToTimestamp match {
case Some(t) =>
Expand Down Expand Up @@ -176,41 +206,45 @@ import org.slf4j.Logger
log.debug("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")
if (EnvelopeOrigin.isHeartbeatEvent(envelope)) state
else {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp) state.latestBacktrackingSeenCount + 1
else 1

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp) state.latestBacktrackingSeenCount + 1 else 1

state.copy(
latestBacktracking = offset,
latestBacktrackingSeenCount = newSeenCount,
itemCount = state.itemCount + 1)
state.copy(
latestBacktracking = offset,
latestBacktrackingSeenCount = newSeenCount,
itemCount = state.itemCount + 1)

} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")
} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")

if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow)))
log.debug(
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
logPrefix,
state.latestBacktracking,
offset)
}

if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow)))
log.debug(
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
logPrefix,
state.latestBacktracking,
offset)
state.copy(latest = offset, itemCount = state.itemCount + 1)
}

state.copy(latest = offset, itemCount = state.itemCount + 1)
}
}

def delayNextQuery(state: QueryState): Option[FiniteDuration] = {
if (switchFromBacktracking(state)) {
// switch from from backtracking immediately
// switch from backtracking immediately
None
} else {
val delay = ContinuousQuery.adjustNextDelay(
Expand All @@ -236,20 +270,44 @@ import org.slf4j.Logger
state.backtracking && state.itemCount < settings.querySettings.bufferSize - state.backtrackingExpectFiltered
}

def switchToBacktracking(state: QueryState, newIdleCount: Long): Boolean = {
// Note that when starting the query with offset = NoOffset, it will try to switch to
// backtracking immediately after the first normal query because
// between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

val qSettings = settings.querySettings

def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = {
val aheadOfInitial =
initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp)

val previousTimestamp =
if (state.previous == TimestampOffset.Zero) state.latest.timestamp
else state.previous.timestamp

aheadOfInitial && previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow))
}

qSettings.backtrackingEnabled &&
!state.backtracking &&
state.latest != TimestampOffset.Zero &&
!disableBacktrackingWhenFarBehindCurrentWallClockTime &&
(newIdleCount >= 5 || // FIXME config?
state.itemCountSinceBacktracking + state.itemCount >= qSettings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)
}

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.itemCount == 0) state.idleCount + 1 else 0
val newState =
if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero &&
(newIdleCount >= 5 ||
state.itemCountSinceBacktracking + state.itemCount >= settings.querySettings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)) {
// FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0`

// Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow
// start tracking query wall clock for heartbeats after initial backtracking query
val newQueryWallClock =
if (state.latestBacktracking != TimestampOffset.Zero) clock.instant()
else Instant.EPOCH

val newState =
if (switchToBacktracking(state, newIdleCount)) {
// switching to backtracking
val fromOffset =
if (state.latestBacktracking == TimestampOffset.Zero)
Expand All @@ -264,28 +322,34 @@ import org.slf4j.Logger
idleCount = newIdleCount,
backtrackingCount = 1,
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
} else if (switchFromBacktracking(state)) {
// switch from backtracking
// switching from backtracking
state.copy(
itemCount = 0,
itemCountSinceBacktracking = 0,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = 0)
backtrackingCount = 0,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
} else {
// continue
// continuing
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
state.copy(
itemCount = 0,
itemCountSinceBacktracking = state.itemCountSinceBacktracking + state.itemCount,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = newBacktrackingCount,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
}

val fromTimestamp = newState.nextQueryFromTimestamp
val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow)
val toTimestamp = {
val behindCurrentTime =
if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime
Expand Down Expand Up @@ -320,7 +384,11 @@ import org.slf4j.Logger
else s"Found [${state.itemCount}] items in previous query.")
}

newState ->
val newStateWithPrevious =
if (newState.backtracking) newState.copy(previousBacktracking = newState.latestBacktracking)
else newState.copy(previous = newState.latest)

newStateWithPrevious ->
Some(
dao
.itemsBySlice(entityType, slice, fromTimestamp, toTimestamp, backtracking = newState.backtracking)
Expand All @@ -330,12 +398,30 @@ import org.slf4j.Logger
.via(deserializeAndAddOffset(newState.currentOffset)))
}

def heartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) {
// use wall clock to measure duration since start, up to idle backtracking limit
val timestamp = state.startTimestamp.plus(
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))

createHeartbeat(timestamp)
} else None
}

val nextHeartbeat: QueryState => Option[Envelope] =
if (settings.journalPublishEvents) heartbeat else _ => None

val currentTimestamp = InstantFactory.now() // Can we use DDB as a timestamp source?
val currentWallClock = clock.instant()

ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty.copy(latest = initialOffset),
initialState = QueryState.empty
.copy(latest = initialOffset, startTimestamp = currentTimestamp, startWallClock = currentWallClock),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = _ => None)
beforeQuery = _ => None,
heartbeat = nextHeartbeat)
}

private def deserializeAndAddOffset(timestampOffset: TimestampOffset): Flow[Item, Envelope, NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ private[dynamodb] object ContinuousQuery {
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery))
beforeQuery: S => Option[Future[S]] = (_: S) => None,
heartbeat: S => Option[T] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(
new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery, heartbeat))

private case object NextQuery

Expand Down Expand Up @@ -69,7 +71,8 @@ final private[dynamodb] class ContinuousQuery[S, T](
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]])
beforeQuery: S => Option[Future[S]],
heartbeat: S => Option[T])
extends GraphStage[SourceShape[T]] {
import ContinuousQuery._

Expand Down Expand Up @@ -151,8 +154,14 @@ final private[dynamodb] class ContinuousQuery[S, T](
next()
}
})

val sourceWithHeartbeat = heartbeat(newState) match {
case None => source
case Some(h) => Source.single(h).concat(source)
}

val graph = Source
.fromGraph(source)
.fromGraph(sourceWithHeartbeat)
.to(sinkIn.sink)
interpreter.subFusingMaterializer.materialize(graph)
sinkIn.pull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import akka.persistence.query.typed.EventEnvelope
val SourceBacktracking = "BT"
val SourcePubSub = "PS"
val SourceSnapshot = "SN"
val SourceHeartbeat = "HB"

def fromQuery(env: EventEnvelope[_]): Boolean =
env.source == SourceQuery
Expand All @@ -32,6 +33,15 @@ import akka.persistence.query.typed.EventEnvelope
def fromSnapshot(env: EventEnvelope[_]): Boolean =
env.source == SourceSnapshot

def fromHeartbeat(env: EventEnvelope[_]): Boolean =
env.source == SourceHeartbeat

def isHeartbeatEvent(env: Any): Boolean =
env match {
case e: EventEnvelope[_] => fromHeartbeat(e)
case _ => false
}

def isFilteredEvent(env: Any): Boolean =
env match {
case e: EventEnvelope[_] => e.filtered
Expand Down
Loading