Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DynamoDB evict offsets per slice #1247

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.projection.dynamodb

import java.time.{ Duration => JDuration }
import java.time.Instant

import akka.persistence.query.TimestampOffset
Expand Down Expand Up @@ -35,7 +36,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
state1.byPid("p1").seqNr shouldBe 3L
state1.offsetBySlice(slice("p1")) shouldBe TimestampOffset(t0.plusMillis(2), Map("p1" -> 3L))
state1.latestTimestamp shouldBe t0.plusMillis(2)
state1.oldestTimestamp shouldBe t0

val state2 = state1.add(Vector(createRecord("p2", 2, t0.plusMillis(1))))
state2.byPid("p1").seqNr shouldBe 3L
Expand All @@ -44,7 +44,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
state2.offsetBySlice(slice("p2")) shouldBe TimestampOffset(t0.plusMillis(1), Map("p2" -> 2L))
// latest not updated because timestamp of p2 was before latest
state2.latestTimestamp shouldBe t0.plusMillis(2)
state2.oldestTimestamp shouldBe t0

val state3 = state2.add(Vector(createRecord("p3", 10, t0.plusMillis(3))))
state3.byPid("p1").seqNr shouldBe 3L
Expand All @@ -54,7 +53,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
slice("p3") should not be slice("p2")
state3.offsetBySlice(slice("p3")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L))
state3.latestTimestamp shouldBe t0.plusMillis(3)
state3.oldestTimestamp shouldBe t0

// same slice and same timestamp, keep both in seen
slice("p10084") shouldBe slice("p3")
Expand All @@ -63,37 +61,45 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
}

"evict old" in {
// these pids have the same slice 645, otherwise it will keep one for each slice
val p1 = "p500"
val p2 = "p621"
val p3 = "p742"
val p4 = "p863"
val p5 = "p984"
val p1 = "p500" // slice 645
val p2 = "p621" // slice 645
val p3 = "p742" // slice 645
val p4 = "p863" // slice 645
val p5 = "p984" // slice 645
val p6 = "p92" // slice 905
val p7 = "p108" // slice 905

val t0 = TestClock.nowMillis().instant()
val state1 = State.empty
.add(
Vector(
createRecord(p1, 1, t0),
createRecord(p2, 2, t0.plusMillis(1)),
createRecord(p3, 3, t0.plusMillis(2)),
createRecord(p4, 4, t0.plusMillis(3)),
createRecord(p5, 5, t0.plusMillis(4))))
state1.oldestTimestamp shouldBe t0
createRecord(p1, 1, t0.plusMillis(1)),
createRecord(p2, 2, t0.plusMillis(2)),
createRecord(p3, 3, t0.plusMillis(3)),
createRecord(p4, 4, t0.plusMillis(4)),
createRecord(p6, 6, t0.plusMillis(6))))
state1.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L)

val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1)
state2.oldestTimestamp shouldBe t0.plusMillis(2)
state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p3 -> 3L, p4 -> 4L, p5 -> 5L)
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

// keep all
state1.evict(t0.plusMillis(2), keepNumberOfEntries = 100) shouldBe state1

// keep 4
val state3 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 4)
state3.oldestTimestamp shouldBe t0.plusMillis(1)
state3.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L)
state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1

// evict older than time window
val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10))))
val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L)

val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2))
state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(
p1 -> 1L,
p2 -> 2L,
p3 -> 3L,
p4 -> 4L,
p5 -> 5L,
p7 -> 7L)
}

"find duplicate" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.Duplicat
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.RejectedBacktrackingSeqNr
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.RejectedSeqNr
import akka.projection.dynamodb.internal.OffsetPidSeqNr
import akka.projection.dynamodb.internal.OffsetStoreDao
import akka.projection.dynamodb.internal.OffsetStoreDao.OffsetStoreAttributes
import akka.projection.internal.ManagementState
import com.typesafe.config.Config
Expand All @@ -42,13 +43,7 @@ import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

object DynamoDBTimestampOffsetStoreSpec {
val config: Config =
ConfigFactory
.parseString("""
# to be able to test eviction
akka.projection.dynamodb.offset-store.keep-number-of-entries = 0
""")
.withFallback(TestConfig.config)
val config: Config = TestConfig.config

def configWithOffsetTTL: Config =
ConfigFactory
Expand Down Expand Up @@ -808,7 +803,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

"evict old records from same slice" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10))
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100))
import evictSettings._
val offsetStore = createOffsetStore(projectionId, evictSettings)

Expand All @@ -827,34 +822,22 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)

offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)),
p4,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)),
p5,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)),
p6,
3L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L))
.futureValue
offsetStore.getState().size shouldBe 6

Expand All @@ -864,81 +847,59 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
offsetStore.getState().size shouldBe 7 // nothing evicted yet

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)),
p8,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L))
.futureValue
offsetStore.getState().size shouldBe 8 // still nothing evicted yet

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)),
p8,
2L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8)

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)),
p8,
3L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p7, p8)
}

"evict old records from different slices" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10))
val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100))
import evictSettings._
val offsetStore = createOffsetStore(projectionId, evictSettings)

val startTime = TestClock.nowMicros().instant()
log.debug("Start time [{}]", startTime)

val p1 = "p500" // slice 645
val p2 = "p92" // slice 905
val p3 = "p108" // slice 905
val p4 = "p863" // slice 645
val p5 = "p984" // slice 645
val p6 = "p3080" // slice 645
val p7 = "p4290" // slice 645
val p8 = "p20180" // slice 645
// these pids have the same slice 645
val p1 = "p500"
val p2 = "p621"
val p3 = "p742"
val p4 = "p863"
val p5 = "p984"
val p6 = "p3080"
val p7 = "p4290"
val p8 = "p20180"
val p9 = "p-0960" // slice 576

offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L))
.futureValue
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)),
p4,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)),
p5,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L))
.futureValue
offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)),
p6,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L))
.futureValue
offsetStore.getState().size shouldBe 6

Expand All @@ -948,31 +909,46 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
offsetStore.getState().size shouldBe 7 // nothing evicted yet

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)),
p8,
1L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L))
.futureValue
offsetStore.getState().size shouldBe 8 // still nothing evicted yet

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)),
p8,
2L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8)

offsetStore
.saveOffset(
OffsetPidSeqNr(
TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)),
p8,
3L))
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p7, p8)

// save same slice, but behind
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1001), Map(p2 -> 2L)), p2, 2L))
.futureValue
// it's evicted immediately
offsetStore.getState().byPid.keySet shouldBe Set(p7, p8)
val dao = new OffsetStoreDao(system, settings, projectionId, client)
// but still saved
dao.loadSequenceNumber(slice(p2), p2).futureValue.get.seqNr shouldBe 2
// the timestamp was earlier than previously used for this slice, and therefore stored timestamp not changed
dao.loadTimestampOffset(slice(p2)).futureValue.get.timestamp shouldBe startTime.plus(timeWindow.plusSeconds(20))

// save another slice that hasn't been used before
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1002), Map(p9 -> 1L)), p9, 1L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8)
dao.loadSequenceNumber(slice(p9), p9).futureValue.get.seqNr shouldBe 1
dao.loadTimestampOffset(slice(p9)).futureValue.get.timestamp shouldBe startTime.plusMillis(1002)
// and one more of that same slice
offsetStore
.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1003), Map(p9 -> 2L)), p9, 2L))
.futureValue
offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8)
dao.loadSequenceNumber(slice(p9), p9).futureValue.get.seqNr shouldBe 2
dao.loadTimestampOffset(slice(p9)).futureValue.get.timestamp shouldBe startTime.plusMillis(1003)
}

"start from slice offset" in {
Expand Down
8 changes: 0 additions & 8 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ akka.projection.dynamodb {
# within this time window from latest offset.
time-window = 5 minutes

# Keep this number of entries. Don't evict old entries until this threshold
# has been reached.
keep-number-of-entries = 10000

# Remove old entries outside the time-window from the offset store memory
# with this frequency.
evict-interval = 10 seconds

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20

Expand Down
Loading
Loading