diff --git a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala index 5db1fd5a9..a12d81d02 100644 --- a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala +++ b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala @@ -35,13 +35,22 @@ trait Resolution extends Serializable { } object FiveMinuteResolution extends Resolution { - def calculateTailHop(window: Window): Long = - window.millis match { - case x if x >= new Window(12, TimeUnit.DAYS).millis => WindowUtils.Day.millis - case x if x >= new Window(12, TimeUnit.HOURS).millis => WindowUtils.Hour.millis - case _ => WindowUtils.FiveMinutes + private val millisIn12Days = new Window(12, TimeUnit.DAYS).millis + private val millisIn12Hours = new Window(12, TimeUnit.HOURS).millis + + /** + * Given a GroupBy Window size, calculate the resolution of the window as defined by Chronon. + */ + def getWindowResolutionMillis(windowSizeMillis: Long): Long = + windowSizeMillis match { + case x if x >= millisIn12Days => WindowUtils.Day.millis + case x if x >= millisIn12Hours => WindowUtils.Hour.millis + case _ => WindowUtils.FiveMinutes } + def calculateTailHop(window: Window): Long = + getWindowResolutionMillis(window.millis) + val hopSizes: Array[Long] = Array(WindowUtils.Day.millis, WindowUtils.Hour.millis, WindowUtils.FiveMinutes) } diff --git a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala index db313590d..2adf0e34c 100644 --- a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala +++ b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothMutationAggregator.scala @@ -141,6 +141,84 @@ class SawtoothMutationAggregator(aggregations: Seq[Aggregation], } } + def updateIrTiledWithTileLayering(ir: Array[Any], + headStreamingTiledIrs: Seq[TiledIr], + queryTs: Long, + batchEndTs: Long) = { + // This works as a sliding window over the sorted list of tiles, where the sort is done by + // timestamp in ascending fashion (oldest -> newest tiles), and then secondarily done by + // tile size. So for example this becomes: + // [ [12:00, 1:00), [12:00, 12:05), [12:05, 12:10), ... , [12:55, 1:00), [1:00, 2:00), [1:00, 1:05), ... ] + // And when a tile is selected, we increment our index pointer to the first tile where the start is greater + // than or equal to the end of the tile we just selected. So if we select the [12:00, 1:00) tile, then the pointer + // will move to the [1:00, 2:00) tile. + + // fill 2D array where each entry corresponds to the windowedAggregator column aggregators, and contains an array of + // the streaming tiles we need to aggregate + // ex) mergeBuffers = [ [1,2,3], [2,4] ] + // windowedAggregators.columnAggregators = [ Sum, Average ] + // resultIr = [ 6, 3 ] + + val sortedStreamingTiles = headStreamingTiledIrs.sortBy { tile => (tile.ts, -tile.size) } + val mergeBuffers = Array.fill(windowedAggregator.length)(mutable.ArrayBuffer.empty[Any]) + + var i: Int = 0 + while (i < windowedAggregator.length) { + val window = windowMappings(i).aggregationPart.window + // tailHopIndices is the list of indices for the hopSizes array for which hopSize a given aggregator is using. + // I.e., At tailHopIndices(i) we will see a number that is the index in hopSizes for the aggregator's hop size. + // For example, if we are using a Five Minute Resolution window then for this instance of SawtoothAggregator, + // the hopSizes(tailHopIndices(i)), where i = 0 is one day, i = 1 is one hour, and i = 2 is five minutes. + // In practice, when we have a window with 5 minute tile sizes, then this is always 5 minutes. + // If we have a window with 1 hour tile sizes, then this is always 1 hour. And if we have a window with 1 day + // tile sizes, then this is always 1 day. + // However, we still need to use this hop size instead of the tile size because we want to correctly exclude 1 hour tiles + // that we may have in addition to 5 minute tiles. + // For example, if we have a window with 5 minute resolution that starts at 12:06pm, then when considering + // the 5 minute tiles, we round down to 12:05pm and will correctly include the 12:05pm 5 minute tile. And when + // considering the 1 hour tiles, we round down to 12:05pm and will correctly exclude the 12:00pm 1 hour tile if it exists. + val hopIndex = tailHopIndices(i) + + var tilePtr: Int = 0 + while (tilePtr < sortedStreamingTiles.length) { + val tile = sortedStreamingTiles(tilePtr) + val tileTs = tile.ts + + if (tileTs >= batchEndTs) { + // If the tile's timestamp is after the start of the window rounded to the nearest tile border (e.g., five minutes, one hour, one day), + // and if the tile's timestamp is before the query timestamp, we should include this tile in this aggregator. + // Or if the aggregator has no window (infinite), we should include this tile. + val irInWindow = (tileTs >= TsUtils.round(queryTs - window.millis, hopSizes(hopIndex)) && tileTs < queryTs) + if (window == null || irInWindow) { + mergeBuffers(i) += tile.ir(i) + + // Adjust tilePtr to next window. E.g., if we just selected a tile covering [12:00, 1:00), the next tile we + // select will start at 1:00 and we will not consider any other tiles between [12:00, 1:00). + while (tilePtr < sortedStreamingTiles.length && sortedStreamingTiles(tilePtr).ts < (tileTs + tile.size)) { + tilePtr += 1 + } + } else { + // If the current tile was not in the window for the aggregator, just continue to the next tile. + tilePtr += 1 + } + } else { + // If the current tile came before batchEndTs, just continue to the next tile. + tilePtr += 1 + } + } + + i += 1 + } + + var idx: Int = 0 + while (idx < mergeBuffers.length) { + // include collapsed batchIr in bulkMerge computation + mergeBuffers(idx) += ir(idx) + ir(idx) = windowedAggregator(idx).bulkMerge(mergeBuffers(idx).iterator) + idx += 1 + } + } + /** * Update the intermediate results with tail hops data from a FinalBatchIr. */ diff --git a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothOnlineAggregator.scala b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothOnlineAggregator.scala index 3dfa15c3d..dc34a53fc 100644 --- a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothOnlineAggregator.scala +++ b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/SawtoothOnlineAggregator.scala @@ -22,7 +22,11 @@ import ai.chronon.api.Extensions.{AggregationPartOps, WindowOps} import ai.chronon.api._ // Wrapper class for handling Irs in the tiled chronon use case -case class TiledIr(ts: Long, ir: Array[Any]) +// ts: timestamp in millis +// ir: IR of the streaming data for the tile +// size: size in millis of the time range for the IR +// Based on the above, tile start = ts, tile end = ts + size +case class TiledIr(ts: Long, ir: Array[Any], size: Long) // batchEndTs = upload time of the batch data as derived from GroupByServingInfo & Cached // cache = Jul-22 / latest = Jul-23, streaming data = 22 - now (filter < jul 23) @@ -126,12 +130,13 @@ class SawtoothOnlineAggregator(val batchEndTs: Long, } def lambdaAggregateIrTiled(finalBatchIr: FinalBatchIr, - streamingTiledIrs: Iterator[TiledIr], - queryTs: Long): Array[Any] = { + streamingTiledIrs: Seq[TiledIr], + queryTs: Long, + useTileLayering: Boolean): Array[Any] = { // null handling if (finalBatchIr == null && streamingTiledIrs == null) return null val batchIr = Option(finalBatchIr).getOrElse(normalizeBatchIr(init)) - val tiledIrs = Option(streamingTiledIrs).getOrElse(Array.empty[TiledIr].iterator) + val headStreamingTiledIrs = Option(streamingTiledIrs).getOrElse(Seq.empty[TiledIr]) if (batchEndTs > queryTs) { throw new IllegalArgumentException(s"Request time of $queryTs is less than batch time $batchEndTs") @@ -140,14 +145,19 @@ class SawtoothOnlineAggregator(val batchEndTs: Long, // initialize with collapsed val resultIr = windowedAggregator.clone(batchIr.collapsed) - // add streaming tiled irs - while (tiledIrs.hasNext) { - val tiledIr = tiledIrs.next() - val tiledIrTs = tiledIr.ts // unbox long only once - if (queryTs > tiledIrTs && tiledIrTs >= batchEndTs) { - updateIrTiled(resultIr, tiledIr, queryTs) + // add head events + if (useTileLayering) { + updateIrTiledWithTileLayering(resultIr, headStreamingTiledIrs, queryTs, batchEndTs) + } else { + // add streaming tiled irs + headStreamingTiledIrs.foreach { tiledIr => + val tiledIrTs = tiledIr.ts // unbox long only once + if (queryTs > tiledIrTs && tiledIrTs >= batchEndTs) { + updateIrTiled(resultIr, tiledIr, queryTs) + } } } + mergeTailHops(resultIr, queryTs, batchEndTs, batchIr) resultIr } @@ -161,10 +171,11 @@ class SawtoothOnlineAggregator(val batchEndTs: Long, } def lambdaAggregateFinalizedTiled(finalBatchIr: FinalBatchIr, - streamingTiledIrs: Iterator[TiledIr], - ts: Long): Array[Any] = { + streamingTiledIrs: Seq[TiledIr], + ts: Long, + useTileLayering: Boolean): Array[Any] = { // TODO: Add support for mutations / hasReversal to the tiled implementation - windowedAggregator.finalize(lambdaAggregateIrTiled(finalBatchIr, streamingTiledIrs, ts)) + windowedAggregator.finalize(lambdaAggregateIrTiled(finalBatchIr, streamingTiledIrs, ts, useTileLayering)) } } diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothMutationAggregatorTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothMutationAggregatorTest.scala new file mode 100644 index 000000000..713f3f489 --- /dev/null +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothMutationAggregatorTest.scala @@ -0,0 +1,671 @@ +package ai.chronon.aggregator.test + +import ai.chronon.aggregator.row.RowAggregator +import ai.chronon.aggregator.windowing.{SawtoothMutationAggregator, TiledIr} +import ai.chronon.api.Extensions.AggregationOps +import ai.chronon.api.{Aggregation, Builders, DataType, IntType, LongType, Operation, StringType, TimeUnit, Window} +import junit.framework.TestCase +import org.junit.Assert.{assertEquals, assertNull} + +class SawtoothMutationAggregatorTest extends TestCase { + + val FiveMinuteTileSize = 5 * 60 * 1000L + val OneHourTileSize = 60 * 60 * 1000L + val OneDayTileSize = 24 * 60 * 60 * 1000L + + def constructTileIr(aggregations: Seq[Aggregation], + inputSchema: Seq[(String, DataType)], + events: Seq[TestRow]): Array[Any] = { + val aggregator = new RowAggregator(inputSchema, aggregations.flatMap(_.unpack)) + val aggIr = aggregator.init + + events.map { e => + aggregator.update(aggIr, e) + } + + aggIr + } + + def testUpdateIrTiled(): Unit = { + val aggregations = Seq( + Builders.Aggregation( + operation = Operation.SUM, + inputColumn = "rating", + windows = Seq(new Window(6, TimeUnit.HOURS)) + ), + Builders.Aggregation( + operation = Operation.AVERAGE, + inputColumn = "rating", + windows = Seq(new Window(1, TimeUnit.DAYS)) + ) + ) + val inputSchema: Seq[(String, DataType)] = Seq( + ("ts_millis", LongType), + ("listing_id", StringType), + ("rating", IntType) + ) + + val queryTs = 1707167971000L // Monday, February 5, 2024 9:19:31 PM + val batchEndTs = 1707091200000L // Monday, February 5, 2024 12:00:00 AM + + // The aggregator should include tile1 and tile4 and skip tiles 2 and 3 since + // they fall into the same hour block as tile 1. Tile0 should be ignored as it + // comes before batchEndTs. + + val tile0 = TiledIr( + 1707087600000L, // [11:00 PM , 12:00 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707087600000L, "listing_1", 10) + ) + ), + OneHourTileSize + ) + + val tile1 = TiledIr( + 1707091200000L, // [12:00 AM , 1:00 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091210000L, "listing_1", 4), + TestRow(1707091220000L, "listing_1", 5), + TestRow(1707091510000L, "listing_1", 6) + ) + ), + OneHourTileSize + ) + + val tile2 = TiledIr( + 1707091200000L, // [12:00 AM, 12:05 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091210000L, "listing_1", 4), + TestRow(1707091220000L, "listing_1", 5) + ) + ), + FiveMinuteTileSize + ) + + val tile3 = TiledIr( + 1707091500000L, // [12:05 AM, 12:10 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091510000L, "listing_1", 6) + ) + ), + FiveMinuteTileSize + ) + + val tile4 = TiledIr( + 1707094800000L, // [1:00 AM, 1:05 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707094800000L, "listing_1", 10) + ) + ), + FiveMinuteTileSize + ) + + val streamingTiledIrs: Seq[TiledIr] = Seq( + tile0, + tile1, + tile2, + tile3, + tile4 + ) + + val sawtoothMutationAggregator = new SawtoothMutationAggregator(aggregations, inputSchema) + val ir = sawtoothMutationAggregator.init + + sawtoothMutationAggregator.updateIrTiledWithTileLayering( + ir.collapsed, + streamingTiledIrs, + queryTs, + batchEndTs + ) + + // assert ir now contains the right values + assertEquals(2, ir.collapsed.length) + + // 6 hour window is null since there were no events within the time frame + assertNull(ir.collapsed(0)) + // 1 day window has items from tile1 and tile4 + assertEquals(25.0, ir.collapsed(1).asInstanceOf[Array[Any]](0)) + assertEquals(4, ir.collapsed(1).asInstanceOf[Array[Any]](1)) + } + + def testUpdateIrTiledMidHourWindowStart(): Unit = { + val aggregations = Seq( + Builders.Aggregation( + operation = Operation.AVERAGE, + inputColumn = "rating", + windows = Seq(new Window(6, TimeUnit.HOURS)) + ) + ) + val inputSchema: Seq[(String, DataType)] = Seq( + ("ts_millis", LongType), + ("listing_id", StringType), + ("rating", IntType) + ) + + val queryTs = 1707167971000L // Monday, February 5, 2024 9:19:31 PM + val batchEndTs = 1707091200000L // Monday, February 5, 2024 12:00:00 AM + + // Window start will be 3:19 PM. + // The aggregator should include tile3 and tile4 and skip tiles 1 and 2 since + // they are timestamped before the start of the window rounded down to the hop size for the aggregator. + + val tile1 = TiledIr( + 1707145200000L, // [3:00 PM , 4:00 PM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707145210000L, "listing_1", 4), + TestRow(1707145220000L, "listing_1", 5), + TestRow(1707146100000L, "listing_1", 6), + TestRow(1707146400000L, "listing_1", 10) + ) + ), + OneHourTileSize + ) + + val tile2 = TiledIr( + 1707145200000L, // [3:00 PM, 3:05 PM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707145210000L, "listing_1", 4), + TestRow(1707145220000L, "listing_1", 5) + ) + ), + FiveMinuteTileSize + ) + + val tile3 = TiledIr( + 1707146100000L, // [3:15 PM, 3:20 PM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707146100000L, "listing_1", 6) + ) + ), + FiveMinuteTileSize + ) + + val tile4 = TiledIr( + 1707146400000L, // [3:20 PM, 3:25 PM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707146400000L, "listing_1", 10) + ) + ), + FiveMinuteTileSize + ) + + val streamingTiledIrs: Seq[TiledIr] = Seq( + tile1, + tile2, + tile3, + tile4 + ) + + val sawtoothMutationAggregator = new SawtoothMutationAggregator(aggregations, inputSchema) + val ir = sawtoothMutationAggregator.init + + sawtoothMutationAggregator.updateIrTiledWithTileLayering( + ir.collapsed, + streamingTiledIrs, + queryTs, + batchEndTs + ) + + assertEquals(1, ir.collapsed.length) + + assertEquals(16.0, ir.collapsed(0).asInstanceOf[Array[Any]](0)) + assertEquals(2, ir.collapsed(0).asInstanceOf[Array[Any]](1)) + } + + def testUpdateIrTiledHandlesUnorderedTiles(): Unit = { + val aggregations = Seq( + Builders.Aggregation( + operation = Operation.SUM, + inputColumn = "rating", + windows = Seq(new Window(6, TimeUnit.HOURS)) + ), + Builders.Aggregation( + operation = Operation.AVERAGE, + inputColumn = "rating", + windows = Seq(new Window(1, TimeUnit.DAYS)) + ) + ) + val inputSchema: Seq[(String, DataType)] = Seq( + ("ts_millis", LongType), + ("listing_id", StringType), + ("rating", IntType) + ) + + val queryTs = 1707167971000L // Monday, February 5, 2024 9:19:31 PM + val batchEndTs = 1707091200000L // Monday, February 5, 2024 12:00:00 AM + + // The aggregator should include tile1 and tile4 and skip tiles 2 and 3 since + // they fall into the same hour block as tile 1. + + val tile1 = TiledIr( + 1707091200000L, // [12:00 AM , 1:00 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091210000L, "listing_1", 4), + TestRow(1707091220000L, "listing_1", 5), + TestRow(1707091510000L, "listing_1", 6) + ) + ), + OneHourTileSize + ) + + val tile2 = TiledIr( + 1707091200000L, // [12:00 AM, 12:05 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091210000L, "listing_1", 4), + TestRow(1707091220000L, "listing_1", 5) + ) + ), + FiveMinuteTileSize + ) + + val tile3 = TiledIr( + 1707091500000L, // [12:05 AM, 12:10 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091510000L, "listing_1", 6) + ) + ), + FiveMinuteTileSize + ) + + val tile4 = TiledIr( + 1707094800000L, // [1:00 AM, 1:05 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707094800000L, "listing_1", 10) + ) + ), + FiveMinuteTileSize + ) + + // The tiles are out of order, with two five minute tiles before the one hour tile. + // Tiles 1 and 2 have the same start time but tile 1 is larger so it should be taken first + // and tile 2 should be ignored. + val streamingTiledIrs: Seq[TiledIr] = Seq( + tile3, + tile2, + tile1, + tile4 + ) + + val sawtoothMutationAggregator = new SawtoothMutationAggregator(aggregations, inputSchema) + val ir = sawtoothMutationAggregator.init + + sawtoothMutationAggregator.updateIrTiledWithTileLayering( + ir.collapsed, + streamingTiledIrs, + queryTs, + batchEndTs + ) + + // assert ir now contains the right values + assertEquals(2, ir.collapsed.length) + + // 6 hour window is null since there were no events within the time frame + assertNull(ir.collapsed(0)) + // 1 day window has items from tile1 and tile4 + assertEquals(25.0, ir.collapsed(1).asInstanceOf[Array[Any]](0)) + assertEquals(4, ir.collapsed(1).asInstanceOf[Array[Any]](1)) + } + + def testUpdateIrTiledMultipleShortWindows(): Unit = { + val aggregations = Seq( + Builders.Aggregation( + operation = Operation.AVERAGE, + inputColumn = "rating", + windows = Seq(new Window(6, TimeUnit.HOURS), new Window(12, TimeUnit.HOURS)) + ) + ) + val inputSchema: Seq[(String, DataType)] = Seq( + ("ts_millis", LongType), + ("listing_id", StringType), + ("rating", IntType) + ) + + val queryTs = 1707167971000L // Monday, February 5, 2024 9:19:31 PM + val batchEndTs = 1707091200000L // Monday, February 5, 2024 12:00:00 AM + + // Window start will be 3:19 PM for the 6 hour window and 9:19 AM for the 12 hour window. + // The aggregator should include tiles 1, 2, and 3 for the 12 hour window and tiles + // 5, 6 for the 6 hour window. + + val tile1 = TiledIr( + 1707124500000L, // [9:15 AM, 9:20 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707124500000L, "listing_1", 1), + TestRow(1707124500000L, "listing_1", 2) + ) + ), + FiveMinuteTileSize + ) + + val tile2 = TiledIr( + 1707124800000L, // [9:20 AM, 9:25 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707124800000L, "listing_1", 3) + ) + ), + FiveMinuteTileSize + ) + + val tile3 = TiledIr( + 1707145200000L, // [3:00 PM , 4:00 PM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707145210000L, "listing_1", 4), + TestRow(1707145220000L, "listing_1", 5), + TestRow(1707146100000L, "listing_1", 6), + TestRow(1707146400000L, "listing_1", 10) + ) + ), + OneHourTileSize + ) + + val tile4 = TiledIr( + 1707145200000L, // [3:00 PM, 3:05 PM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707145210000L, "listing_1", 4), + TestRow(1707145220000L, "listing_1", 5) + ) + ), + FiveMinuteTileSize + ) + + val tile5 = TiledIr( + 1707146100000L, // [3:15 PM, 3:20 PM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707146100000L, "listing_1", 6) + ) + ), + FiveMinuteTileSize + ) + + val tile6 = TiledIr( + 1707146400000L, // [3:20 PM, 3:25 PM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707146400000L, "listing_1", 10) + ) + ), + FiveMinuteTileSize + ) + + val streamingTiledIrs: Seq[TiledIr] = Seq( + tile1, + tile2, + tile3, + tile4, + tile5, + tile6 + ) + + val sawtoothMutationAggregator = new SawtoothMutationAggregator(aggregations, inputSchema) + val ir = sawtoothMutationAggregator.init + + sawtoothMutationAggregator.updateIrTiledWithTileLayering( + ir.collapsed, + streamingTiledIrs, + queryTs, + batchEndTs + ) + + assertEquals(2, ir.collapsed.length) + + assertEquals(16.0, ir.collapsed(0).asInstanceOf[Array[Any]](0)) + assertEquals(2, ir.collapsed(0).asInstanceOf[Array[Any]](1)) + + assertEquals(31.0, ir.collapsed(1).asInstanceOf[Array[Any]](0)) + assertEquals(7, ir.collapsed(1).asInstanceOf[Array[Any]](1)) + } + + def testUpdateIrTiledArbitraryTileSize(): Unit = { + val aggregations = Seq( + Builders.Aggregation( + operation = Operation.AVERAGE, + inputColumn = "rating", + windows = Seq(new Window(3, TimeUnit.HOURS)) + ) + ) + val inputSchema: Seq[(String, DataType)] = Seq( + ("ts_millis", LongType), + ("listing_id", StringType), + ("rating", IntType) + ) + + val queryTs = 1707103140000L // Monday, February 5, 2024 3:19:00 AM + val batchEndTs = 1707091200000L // Monday, February 5, 2024 12:00:00 AM + + // Window has a 12:19 start. + // The aggregator should include tile3 and ignore tiles 1 and 2. + + val tile1 = TiledIr( + 1707091200000L, // [12:00 AM, 12:08 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091210000L, "listing_1", 4), + TestRow(1707091220000L, "listing_1", 5) + ) + ), + 8 * 60 * 1000L // 8 minutes + ) + + val tile2 = TiledIr( + 1707091680000L, // [12:08 AM, 12:16 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707092160000L, "listing_1", 6) + ) + ), + 8 * 60 * 1000L // 8 minutes + ) + + val tile3 = TiledIr( + 1707092160000L, // [12:16 AM, 12:24 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707092160000L, "listing_1", 10) + ) + ), + 8 * 60 * 1000L // 8 minutes + ) + + val streamingTiledIrs: Seq[TiledIr] = Seq( + tile1, + tile2, + tile3 + ) + + val sawtoothMutationAggregator = new SawtoothMutationAggregator(aggregations, inputSchema) + val ir = sawtoothMutationAggregator.init + + sawtoothMutationAggregator.updateIrTiledWithTileLayering( + ir.collapsed, + streamingTiledIrs, + queryTs, + batchEndTs + ) + + assertEquals(1, ir.collapsed.length) + + assertEquals(10.0, ir.collapsed(0).asInstanceOf[Array[Any]](0)) + assertEquals(1, ir.collapsed(0).asInstanceOf[Array[Any]](1)) + } + + def testUpdateIrTiledDayTiles(): Unit = { + val aggregations = Seq( + Builders.Aggregation( + operation = Operation.AVERAGE, + inputColumn = "rating", + windows = Seq(new Window(12, TimeUnit.DAYS)) + ) + ) + val inputSchema: Seq[(String, DataType)] = Seq( + ("ts_millis", LongType), + ("listing_id", StringType), + ("rating", IntType) + ) + + val queryTs = 1707167971000L // Monday, February 5, 2024 9:19:31 PM + val batchEndTs = 1707004800000L // Monday, February 4, 2024 12:00:00 AM + + // The aggregator should include tile0 and tile1 and exclude the rest. + val tile0 = TiledIr( + 1707004800000L, // [12:00 AM , 12:00 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707004800000L, "listing_1", 1) + ) + ), + OneDayTileSize + ) + + val tile1 = TiledIr( + 1707087600000L, // [11:00 PM , 12:00 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707087600000L, "listing_1", 10) + ) + ), + OneHourTileSize + ) + + val tile2 = TiledIr( + 1707091200000L, // [12:00 AM , 12:00 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091210000L, "listing_1", 4), + TestRow(1707091220000L, "listing_1", 5), + TestRow(1707091510000L, "listing_1", 6), + TestRow(1707094800000L, "listing_1", 10) + ) + ), + OneDayTileSize + ) + + val tile3 = TiledIr( + 1707091200000L, // [12:00 AM, 12:05 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091210000L, "listing_1", 4), + TestRow(1707091220000L, "listing_1", 5) + ) + ), + FiveMinuteTileSize + ) + + val tile4 = TiledIr( + 1707091500000L, // [12:05 AM, 12:10 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707091510000L, "listing_1", 6) + ) + ), + FiveMinuteTileSize + ) + + val tile5 = TiledIr( + 1707094800000L, // [1:00 AM, 1:05 AM) + constructTileIr( + aggregations, + inputSchema, + Seq( + TestRow(1707094800000L, "listing_1", 10) + ) + ), + FiveMinuteTileSize + ) + + val streamingTiledIrs: Seq[TiledIr] = Seq( + tile0, + tile1, + tile2, + tile3, + tile4, + tile5 + ) + + val sawtoothMutationAggregator = new SawtoothMutationAggregator(aggregations, inputSchema) + val ir = sawtoothMutationAggregator.init + + sawtoothMutationAggregator.updateIrTiledWithTileLayering( + ir.collapsed, + streamingTiledIrs, + queryTs, + batchEndTs + ) + + assertEquals(1, ir.collapsed.length) + + assertEquals(26.0, ir.collapsed(0).asInstanceOf[Array[Any]](0)) + assertEquals(5, ir.collapsed(0).asInstanceOf[Array[Any]](1)) + } +} \ No newline at end of file diff --git a/docs/source/Tiled_Architecture.md b/docs/source/Tiled_Architecture.md index fc0ce9650..5614f1724 100644 --- a/docs/source/Tiled_Architecture.md +++ b/docs/source/Tiled_Architecture.md @@ -62,6 +62,77 @@ Organizations operating at scale with many hot-key entities (such as big merchan ## How to enable tiling To enable tiling: - 1. Implement Flink on the write path (refer to the [Chronon on Flink documentation](setup/Flink.md)). As part of this process, you may also need to modify your KV store implementation to support writing and fetching tiles. -2. Once your Flink app is writing tiles to the datastore, enable tiled reads in the Fetcher by adding `enable_tiling=true` to the [customJson](https://github.com/airbnb/chronon/blob/48b789dd2c216c62bbf1d74fbf4e779f23db541f/api/py/ai/chronon/group_by.py#L561) of any GroupBy definition. \ No newline at end of file +2. Once your Flink app is writing tiles to the datastore, enable tiled reads in the Fetcher by adding `enable_tiling=true` to the [customJson](https://github.com/airbnb/chronon/blob/48b789dd2c216c62bbf1d74fbf4e779f23db541f/api/py/ai/chronon/group_by.py#L561) of any GroupBy definition. + + +## Advanced Options + +### Tile Layering + +Tile layering is an optimization that introduces 1-hour tiles alongside 5-minute tiles for GroupBys with windows shorter than 12 hours. This reduces the number of tiles fetched from the key-value store (by ~85% in practice), resulting in a substantial decrease in feature serving latency. Tile layering is enabled by default when using the tiled architecture. + +#### How It Works + +**Tile Size Selection Before Tile Layering** + +Without tile layering, tile sizes in Flink are determined by the three default resolutions used in Chronon: + +* 5-minute tiles: For GroupBys with any window shorter than 12 hours +* 1-hour tiles: For GroupBys with the shortest window between 12 hours and 12 days +* 1-day tiles: For GroupBys with the shortest window of 12 days or longer + +For instance, a GroupBy with windows of 1 hour, 12 hours, 7 days, and 30 days would use 5-minute tiles due to the 1-hour window. + +(Note: These resolutions are uses all across Chronon for sawtooth window calculation. For more details, see [Windowing](https://chronon.ai/authoring_features/GroupBy.html#windowing) and [Resolution.scala](https://github.com/airbnb/chronon/blob/main/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala#L40-L42)) + +**Tile Size Selection With Tile Layering** + +For GroupBys with windows shorter than 12 hours: + +1. The Flink job incorporates two window operators: one for 5-minute tiles and another for 1-hour tiles. +2. Both 5-minute and 1-hour tiles are stored in the key-value store. +3. The Fetcher prioritizes larger tiles when possible, using smaller tiles only when necessary. For GroupBys with multiple window sizes, overlapping tiles may be fetched, but each window size is calculated independently and there is no double-counting. + +#### Example: Fetching a GroupBy with 6-hour and 1-day Windows at 14:34 + +(Assume batch data for the previous day has already landed, so we only need tiles starting at 00:00.) + +**Before Tile Layering** + +Without tile layering, the system fetches 175 5-minute tiles from the KV store: + +``` +[0:00 - 0:05), [0:05 - 0:10), … , [14:30 - 14:35) +``` + +To aggregate the values for the 1-day window, the Fetcher uses all these tiles. For 6-hour window, it uses only those from 8:30 onwards. + +**After Tile Layering** + +With tile layering, the system fetches: + +``` +5-minute tiles: [8:30 - 8:35) ... [8:55 - 9:00) +1-hour tiles: [0:00 - 1:00) ... [9:00 - 15:00) +``` + +In total, 21 tiles are fetched: 6 five-minute tiles (8:30 to 9:00) and 15 one-hour tiles (0:00 to 15:00). The [14:00 - 15:00) tile is partially complete, containing data from 14:00 to 14:34. + +For the 6-hour window, tiles from 8:30 to 14:34 are aggregated. For the 1-day window, one-hour tiles from 0:00 to 14:34 are used. + +### Impact + +Tile layering has several benefits: + +1. Reduced latency: Fetching fewer, larger tiles significantly decreases serving time. Users can expect improvements of at least 50% at p99. +2. Lower CPU utilization: Less computational work is required to deserialize and merge fewer tiles. +3. Decreased data store fanout: The number of requests to the store is substantially reduced. + +However, there are trade-offs to consider: + +1. Increased Flink workload: Jobs must perform more work and maintain more state. However, Stripe's experience with tile layering over more than a year suggests that state size has not become a bottleneck. +Moreover, the windowing portion of the Flink DAG (which is duplicated with Tile Layering) tends to consume less CPU compared to the initial, shared part of the DAG (responsible for event fetching, deserialization, and Spark expression evaluation). As a result, scaling up Flink jobs is often unnecessary with Tile Layering. + + +2. Higher key-value store utilization: Writing additional tiles increases storage requirements. diff --git a/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala b/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala index 25b7f0039..08c2b7c47 100644 --- a/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala +++ b/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala @@ -1,8 +1,9 @@ package ai.chronon.flink import ai.chronon.aggregator.windowing.ResolutionUtils -import ai.chronon.api.{DataType} -import ai.chronon.api.Extensions.{GroupByOps, SourceOps} +import ai.chronon.api.DataType +import ai.chronon.api.Extensions.{GroupByOps, SourceOps, WindowOps} +import ai.chronon.flink.window.WindowSizeUtils.getFlinkWindowSizesAndWindowIds import ai.chronon.flink.window.{ AlwaysFireOnElementTrigger, FlinkRowAggProcessFunction, @@ -10,9 +11,9 @@ import ai.chronon.flink.window.{ KeySelector, TimestampedTile } -import ai.chronon.online.{GroupByServingInfoParsed, SparkConversions} +import ai.chronon.online.{GroupByServingInfoParsed, SparkConversions, TileSize} import ai.chronon.online.KVStore.PutRequest -import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} +import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, OutputTag, StreamExecutionEnvironment} import org.apache.spark.sql.Encoder import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction @@ -21,6 +22,8 @@ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.slf4j.LoggerFactory +import scala.jdk.CollectionConverters.asScalaBufferConverter + /** * Flink job that processes a single streaming GroupBy and writes out the results to the KV store. * @@ -118,9 +121,6 @@ class FlinkJob[T](eventSrc: FlinkSource[T], f"Running Flink job for featureGroupName=${featureGroupName}, kafkaTopic=${kafkaTopic}. " + f"Tiling is enabled.") - val tilingWindowSizeInMillis: Option[Long] = - ResolutionUtils.getSmallestWindowResolutionInMillis(groupByServingInfoParsed.groupBy) - val sourceStream: DataStream[T] = eventSrc .getDataStream(kafkaTopic, featureGroupName)(env, parallelism) @@ -131,64 +131,78 @@ class FlinkJob[T](eventSrc: FlinkSource[T], .name(s"Spark expression eval for $featureGroupName") .setParallelism(sourceStream.parallelism) // Use same parallelism as previous operator - val inputSchema: Seq[(String, DataType)] = - exprEval.getOutputSchema.fields - .map(field => (field.name, SparkConversions.toChrononType(field.name, field.dataType))) - .toSeq - - val window = TumblingEventTimeWindows - .of(Time.milliseconds(tilingWindowSizeInMillis.get)) - .asInstanceOf[WindowAssigner[Map[String, Any], TimeWindow]] - - // An alternative to AlwaysFireOnElementTrigger can be used: BufferedProcessingTimeTrigger. - // The latter will buffer writes so they happen at most every X milliseconds per GroupBy & key. - val trigger = new AlwaysFireOnElementTrigger() - - // We use Flink "Side Outputs" to track any late events that aren't computed. - val tilingLateEventsTag = OutputTag[Map[String, Any]]("tiling-late-events") - - // The tiling operator works the following way: - // 1. Input: Spark expression eval (previous operator) - // 2. Key by the entity key(s) defined in the groupby - // 3. Window by a tumbling window - // 4. Use our custom trigger that will "FIRE" on every element - // 5. the AggregationFunction merges each incoming element with the current IRs which are kept in state - // - Each time a "FIRE" is triggered (i.e. on every event), getResult() is called and the current IRs are emitted - // 6. A process window function does additional processing each time the AggregationFunction emits results - // - The only purpose of this window function is to mark tiles as closed so we can do client-side caching in SFS - // 7. Output: TimestampedTile, containing the current IRs (Avro encoded) and the timestamp of the current element - val tilingDS: DataStream[TimestampedTile] = - sparkExprEvalDS - .keyBy(KeySelector.getKeySelectionFunction(groupByServingInfoParsed.groupBy)) - .window(window) - .trigger(trigger) - .sideOutputLateData(tilingLateEventsTag) - .aggregate( - // See Flink's "ProcessWindowFunction with Incremental Aggregation" - preAggregator = new FlinkRowAggregationFunction(groupByServingInfoParsed.groupBy, inputSchema), - windowFunction = new FlinkRowAggProcessFunction(groupByServingInfoParsed.groupBy, inputSchema) - ) - .uid(s"tiling-01-$featureGroupName") - .name(s"Tiling for $featureGroupName") - .setParallelism(sourceStream.parallelism) - - // Track late events - val sideOutputStream: DataStream[Map[String, Any]] = - tilingDS - .getSideOutput(tilingLateEventsTag) - .flatMap(new LateEventCounter(featureGroupName)) - .uid(s"tiling-side-output-01-$featureGroupName") - .name(s"Tiling Side Output Late Data for $featureGroupName") - .setParallelism(sourceStream.parallelism) - - val putRecordDS: DataStream[PutRequest] = tilingDS - .flatMap(new TiledAvroCodecFn[T](groupByServingInfoParsed)) - .uid(s"avro-conversion-01-$featureGroupName") - .name(s"Avro conversion for $featureGroupName") - .setParallelism(sourceStream.parallelism) + val keyedSparkExprEvalStream: KeyedStream[Map[String, Any], List[Any]] = sparkExprEvalDS + .keyBy(KeySelector.getKeySelectionFunction(groupByServingInfoParsed.groupBy)) + + // Fork the stream into overlapping windows of different sizes. + val windowsAndAvroConversions = + getFlinkWindowSizesAndWindowIds( + groupByServingInfoParsed.groupBy.getAggregations.asScala + .flatMap(_.getWindows.asScala) + .map(_.millis) // All window sizes in the GroupBy, in millis + ).map { + case (windowSize: TileSize, windowId: String) => + val inputSchema: Seq[(String, DataType)] = + exprEval.getOutputSchema.fields + .map(field => (field.name, SparkConversions.toChrononType(field.name, field.dataType))) + .toSeq + + val window = TumblingEventTimeWindows + .of(Time.milliseconds(windowSize.millis)) + .asInstanceOf[WindowAssigner[Map[String, Any], TimeWindow]] + + // An alternative to AlwaysFireOnElementTrigger can be used: BufferedProcessingTimeTrigger. + // The latter will buffer writes so they happen at most every X milliseconds per GroupBy & key. + val trigger = new AlwaysFireOnElementTrigger() + + // We use Flink "Side Outputs" to track any late events that aren't computed. + val tilingLateEventsTag = OutputTag[Map[String, Any]]("tiling-late-events") + + // The tiling operator works the following way: + // 1. Input: Spark expression eval (previous operator) + // 2. Key by the entity key(s) defined in the groupby + // 3. Window by a tumbling window + // 4. Use our custom trigger that will "FIRE" on every element + // 5. the AggregationFunction merges each incoming element with the current IRs which are kept in state + // - Each time a "FIRE" is triggered (i.e. on every event), getResult() is called and the current IRs are emitted + // 6. A process window function does additional processing each time the AggregationFunction emits results + // - The only purpose of this window function is to mark tiles as closed so we can do client-side caching in SFS + // 7. Output: TimestampedTile, containing the current IRs (Avro encoded) and the timestamp of the current element + val tilingDS: DataStream[TimestampedTile] = + keyedSparkExprEvalStream + .window(window) + .trigger(trigger) + .sideOutputLateData(tilingLateEventsTag) + .aggregate( + // See Flink's "ProcessWindowFunction with Incremental Aggregation" + preAggregator = new FlinkRowAggregationFunction(groupByServingInfoParsed.groupBy, inputSchema), + windowFunction = new FlinkRowAggProcessFunction(groupByServingInfoParsed.groupBy, inputSchema) + ) + .uid(s"tiling-$windowId-$featureGroupName") + .name(s"Tiling for $featureGroupName size $windowSize") + .setParallelism(sourceStream.parallelism) + + // Track late events + val sideOutputStream: DataStream[Map[String, Any]] = + tilingDS + .getSideOutput(tilingLateEventsTag) + .flatMap(new LateEventCounter(featureGroupName)) + .uid(s"tiling-side-output-$windowId-$featureGroupName") + .name(s"Tiling Side Output Late Data for $featureGroupName size $windowSize") + .setParallelism(sourceStream.parallelism) + + val putRecordDS: DataStream[PutRequest] = tilingDS + .flatMap(new TiledAvroCodecFn[T](groupByServingInfoParsed)) + .uid(s"avro-conversion-$windowId-$featureGroupName") + .name(s"Avro conversion for $featureGroupName size $windowSize") + .setParallelism(sourceStream.parallelism) + + putRecordDS + } + .reduce(_.union(_)) AsyncKVStoreWriter.withUnorderedWaits( - putRecordDS, + windowsAndAvroConversions, sinkFn, featureGroupName ) diff --git a/flink/src/main/scala/ai/chronon/flink/window/WindowSizeUtils.scala b/flink/src/main/scala/ai/chronon/flink/window/WindowSizeUtils.scala new file mode 100644 index 000000000..2a3e581b1 --- /dev/null +++ b/flink/src/main/scala/ai/chronon/flink/window/WindowSizeUtils.scala @@ -0,0 +1,60 @@ +package ai.chronon.flink.window + +import ai.chronon.aggregator.windowing.FiveMinuteResolution.getWindowResolutionMillis +import ai.chronon.api.Extensions.{WindowOps, WindowUtils} +import ai.chronon.online.TileSize + +import scala.collection.mutable.{Map => MutableMap} + +/** + * This object contains utility methods for determining what Flink window sizes Chronon will use for a given GroupBy. + */ +object WindowSizeUtils { + + /** + * Given a list of GroupBy Windows sizes in millis, calculate the smallest window + * resolution as defined by Chronon. Returns None if the list of windows is empty. + * + * For example, if the windows are [30d, 7d, 1d, 1h], the resolution are + * [1d, 1h, 1h, 5m], and the smallest resolution is 5m. + */ + def getSmallestWindowResolutionInMillis(windowSizesMillis: Seq[Long]): Option[Long] = + Option(windowSizesMillis.map(windowSizeMillis => getWindowResolutionMillis(windowSizeMillis))) + .map(_.min) + + /** + * Given a list of GroupBy Windows, determine what window sizes should be used in Flink. + * Each window size is accompanied by a unique ID (String) that is used to build the UID + * of the Flink operators. All Flink window sizes are also TileSizes. + * + * This public method exists so different parts of Chronon can use the same logic to determine + * what tile sizes are available. The Chronon KV store can use this method to determine what tiles + * sizes it can fetch. + */ + def getFlinkWindowSizesAndWindowIds( + windowSizesMillis: Seq[Long] + ): Map[TileSize, String] = { + // Create a map of window sizes to ID. The IDs are used to build the UID of the Flink operators. + val flinkWindowSizes: MutableMap[TileSize, String] = MutableMap() + + // All GroupBys have a Flink window equal to its smallest window resolution, e.g. a GB with a + // 10-hour window will have a 5-minute Flink window/tile. + val smallestWindowResolutionInMillis: Long = + getSmallestWindowResolutionInMillis(windowSizesMillis) match { + case Some(smallestTileSize) => smallestTileSize + // Some GroupBys don't have windows defined, so we default to 1 day. + case None => + WindowUtils.Day.millis + } + val smallestTileSize: TileSize = TileSize.fromMillis(smallestWindowResolutionInMillis) + flinkWindowSizes += (smallestTileSize -> "01") // These ID strings are arbitrarily chosen. + + // Optimization (Tile Layering): all GroupBys with 5-minute tiles should also have 1-hour tiles. + val use1HourWindow = smallestWindowResolutionInMillis == WindowUtils.FiveMinutes + if (use1HourWindow) { + flinkWindowSizes += (TileSize.OneHour -> "11") + } + + flinkWindowSizes.toMap + } +} diff --git a/online/src/main/scala/ai/chronon/online/Api.scala b/online/src/main/scala/ai/chronon/online/Api.scala index 2becebce6..9e5152281 100644 --- a/online/src/main/scala/ai/chronon/online/Api.scala +++ b/online/src/main/scala/ai/chronon/online/Api.scala @@ -33,7 +33,8 @@ object KVStore { // a scan request essentially for the keyBytes // afterTsMillis - is used to limit the scan to more recent data case class GetRequest(keyBytes: Array[Byte], dataset: String, afterTsMillis: Option[Long] = None) - case class TimedValue(bytes: Array[Byte], millis: Long) + // tileSizeMillis is only set for the tiled version of chronon + case class TimedValue(bytes: Array[Byte], millis: Long, tileSizeMillis: Option[Long] = None) case class GetResponse(request: GetRequest, values: Try[Seq[TimedValue]]) { def latest: Try[TimedValue] = values.map(_.maxBy(_.millis)) } diff --git a/online/src/main/scala/ai/chronon/online/FetcherBase.scala b/online/src/main/scala/ai/chronon/online/FetcherBase.scala index abae38584..c8bccd3f1 100644 --- a/online/src/main/scala/ai/chronon/online/FetcherBase.scala +++ b/online/src/main/scala/ai/chronon/online/FetcherBase.scala @@ -141,11 +141,20 @@ class FetcherBase(kvStore: KVStore, val allStreamingIrDecodeStartTime = System.currentTimeMillis() val output: Array[Any] = if (servingInfo.isTilingEnabled) { - val streamingIrs: Iterator[TiledIr] = streamingResponses.iterator + val streamingIrs: Seq[TiledIr] = streamingResponses .filter(tVal => tVal.millis >= servingInfo.batchEndTsMillis) .flatMap { tVal => Try(servingInfo.tiledCodec.decodeTileIr(tVal.bytes)) match { - case Success((tile, _)) => Array(TiledIr(tVal.millis, tile)) + case Success((tile, _)) => + val tileSize = tVal.tileSizeMillis match { + case Some(size) => size + case None => + throw new RuntimeException( + "Encountered a TimedValue that does not have the tile size set. " + + "The size is necessary when using tiling." + + "Failing to construct TiledIr.") + } + Array(TiledIr(tVal.millis, tile, tileSize)) case Failure(_) => logger.error( s"Failed to decode tile ir for groupBy ${servingInfo.groupByOps.metaData.getName}" + @@ -163,8 +172,6 @@ class FetcherBase(kvStore: KVStore, } } } - .toArray - .iterator context.distribution("group_by.all_streamingir_decode.latency.millis", System.currentTimeMillis() - allStreamingIrDecodeStartTime) @@ -179,8 +186,9 @@ class FetcherBase(kvStore: KVStore, |""".stripMargin) } + val useTileLayering = true val aggregatorStartTime = System.currentTimeMillis() - val result = aggregator.lambdaAggregateFinalizedTiled(batchIr, streamingIrs, queryTimeMs) + val result = aggregator.lambdaAggregateFinalizedTiled(batchIr, streamingIrs, queryTimeMs, useTileLayering) context.distribution("group_by.aggregator.latency.millis", System.currentTimeMillis() - aggregatorStartTime) result } else { diff --git a/online/src/main/scala/ai/chronon/online/TileFetchingUtils.scala b/online/src/main/scala/ai/chronon/online/TileFetchingUtils.scala new file mode 100644 index 000000000..b431c7593 --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/TileFetchingUtils.scala @@ -0,0 +1,253 @@ +package ai.chronon.online + +import ai.chronon.aggregator.windowing.FiveMinuteResolution.getWindowResolutionMillis + +/** + * When using the Tiled Architecture, Chronon stores streaming data in the Key-Value + * store as tiles that contain the intermediate results for a given time range. + * + * This utility contains the code that calculates what tiles are need to serve a certain + * GroupBy, at a specific time. + * + * Related: "The Tiled Architecture" https://chronon.ai/Tiled_Architecture.html + */ +object TileFetchingUtils { + + /** + * Given a GroupBy Window and the current time in millis, return the start of the saw-tooth window in millis. + * + * Example: + * current time: 14:33:24, window size: 01:00:00, resolution: 00:05:00 => window start rounded down: 13:30:00. + */ + private[online] def getSawToothWindowStartMillis( + windowSizeMillis: Long, + currentTimeMillis: Long, + resolutionMillis: Long + ) = { + // It does not make sense for any of these parameters to be zero. + require( + (windowSizeMillis > 0 && currentTimeMillis > 0 && resolutionMillis > 0), + "Window size, current time, and resolution must be non-zero. Got " + + s"windowSizeMillis: $windowSizeMillis, " + + s"currentTimeMillis: $currentTimeMillis, " + + s"resolutionMillis: $resolutionMillis)" + ) + + (currentTimeMillis - windowSizeMillis) - ((currentTimeMillis - windowSizeMillis) % resolutionMillis) + } + + /** + * Given a list of windows in a GroupBy, the current time in millis, and the tile sizes available, return the tiles + * needed to be fetched from the KV store to "fill" or compute the windows as a set of (tile start: millis, + * tile size: millis) tuples. + * + * The algorithm will only select tiles after `startTsMillis`, even if the windows provided start before that time. + */ + def getTilesForWindows( + windowSizesMillis: Seq[Long], + startTsMillis: Long, + currentTimeMillis: Long, + tileSizesAvailable: IndexedSeq[TileSize] + ): Set[(Long, Long)] = { + if (startTsMillis > currentTimeMillis) + throw new IllegalStateException( + s"Invalid range: startTsMillis ($startTsMillis) must be less than currentTimeMillis ($currentTimeMillis)." + ) + + // The algorithm works by fetching the largest tiles first, so we sort the tile sizes in descending order. + val tileSizesAvailableSorted = tileSizesAvailable.sortBy(-_.millis) + + // For each window, get the set of tiles necessary. We use a set to prevent requesting duplicate tiles. + windowSizesMillis.flatMap { windowSizeMillis => + getTilesForWindow( + windowSizeMillis, + currentTimeMillis, + startTsMillis, + tileSizesAvailableSorted + ) + }.toSet + } + + /** + * Given a GroupBy Window, the current time in millis, and the tile sizes available, return the list of tiles needed + * to "fill" or compute the Window. The tiles are returned as a set of (tile start: millis, tile size: millis) tuples. + * + * Example: We want to fetch tiles for a GroupBy Window of 6 hours. The tile sizes available are 6 hours, 1 hour, + * 20 minutes, and 5 minutes. The current time is 08:38:00 + * + * Execution: getTilesForWindow(6hr, [6hr, 1hr, 20m, 5m], 08:38:00) is invoked and calls getTilesForRangeInWindow + * - First invocation of getTilesForRangeInWindow: + * getTilesForRangeInWindow([2:35, 8:38], 6hr) + * range to fill: [2:35, 8:38] + * algorithm allocates one tile: [6:00, 12:00). + * - Second, recursive invocation: + * getTilesForRangeInWindow([2:35, 6:00], 1hr) + * range to fill: [2:35, 6:00] + * algorithm allocates three tiles: [3:00, 4:00), [4:00, 5:00), [5:00, 6:00). + * - Third, recursive invocation: + * getTilesForRangeInWindow([2:35, 3:00], 20m) + * range to fill: [2:35, 3:00] + * algorithm allocates one tile: [2:40, 3:00) + * - Fourth, recursive invocation: + * getTilesForRangeInWindow([2:35, 2:40], 5m) + * range to fill: [2:35, 2:40] + * algorithm allocates one tile: [2:35, 2:40) + * + * Result: the following tiles are needed: + * [6:00, 12:00), [3:00, 4:00), [4:00, 5:00), [5:00, 6:00), [2:40, 3:00), [2:35, 2:40) + */ + private def getTilesForWindow( + windowSizeMillis: Long, + currentTimeMillis: Long, + startTsMillis: Long, + tileSizesAvailableForThisChrononWindowSortedDescending: IndexedSeq[TileSize] + ): Set[(Long, Long)] = { + val windowResolution = getWindowResolutionMillis(windowSizeMillis) + + // We only need to fetch tiles that are after the startTsMillis. (It's assumed that the tiles before this time + // are already available in the offline/batch data.) + val sawToothWindowStartMillis = Math.max( + startTsMillis, + getSawToothWindowStartMillis(windowSizeMillis, currentTimeMillis, windowResolution) + ) + + // The tile fetching algorithm must start with the largest tile size + val largestTileSizeMillis = tileSizesAvailableForThisChrononWindowSortedDescending.head + + // Get the tiles that need to be fetched. + getTilesForRangeInWindow( + sawToothWindowStartMillis, + currentTimeMillis + 1, // The end of the range is exclusive, so we add 1 millisecond to the current time to include it + largestTileSizeMillis, + tileSizesAvailableForThisChrononWindowSortedDescending, + canExceedRangeEnd = true + ).map(tile => (tile._1, tile._2.millis)) + } + + /** + * Given a range return the Epoch-aligned tiles that need to be fetched. + * + * @param canExceedRangeEnd If true, the algorithm can fetch tiles that exceed the range end. For example, if the + * range is [14:35, 19:30) and tile size is 06:00, it's ok for us to fetch the + * partially-complete [18:00, 00:00) tile, so canExceedRangeEnd should be True. + */ + private def getTilesForRangeInWindow( + rangeStartInclusiveMillis: Long, + rangeEndExclusiveMillis: Long, + currentTileSize: TileSize, + tileSizesAvailableForThisChrononWindowSortedDescending: IndexedSeq[TileSize], + canExceedRangeEnd: Boolean = false + ): Set[(Long, TileSize)] = { + // Find the first possible tile start within the range. + // Example: range: [14:35, 16:00), tile size: 00:20 => first possible tile start = 14:40. + val firstPossibleTileStartAlignedWithEpoch = rangeStartInclusiveMillis + + ((currentTileSize.millis - (rangeStartInclusiveMillis % currentTileSize.millis)) % currentTileSize.millis) + + // If the first possible tile start is after the end of the range, we can't use the current tile size, so we try + // the next smaller one. + // Example: range: [14:35, 16:00), tile size: 06:00 => first possible tile start = 18:00. + if (firstPossibleTileStartAlignedWithEpoch >= rangeEndExclusiveMillis) { + // If tile size is the smallest available, we can't fill the window, something is very wrong with the tiling setup. + if (currentTileSize == tileSizesAvailableForThisChrononWindowSortedDescending.last) { + throw new IllegalStateException( + s"Could not fill a Chronon Window defined in a GroupBy with the tile sizes currently available from " + + s"Flink. Smallest tile size: $currentTileSize Range: [$rangeStartInclusiveMillis, $rangeEndExclusiveMillis)." + ) + } + + getTilesForRangeInWindow( + rangeStartInclusiveMillis, + rangeEndExclusiveMillis, + tileSizesAvailableForThisChrononWindowSortedDescending( + tileSizesAvailableForThisChrononWindowSortedDescending.indexOf(currentTileSize) + 1 + ), + tileSizesAvailableForThisChrononWindowSortedDescending, + canExceedRangeEnd // If firstPossibleTileStartAlignedWithEpoch is AFTER the end of the range, that means that, + // at this point in the algorithm's execution, we haven't fetched a single tile. (This is true because the + // algorithm works by fetching the largest tiles first.) So, pass canExceedRangeEnd forward. + ) + } else { + // Find the last possible tile end within the range. + val lastPossibleTileEndAlignedWithEpoch = + if (!canExceedRangeEnd) { + // This is the default case. + // Example: range: [14:35, 16:00), tile size: 00:20 => last possible tile end = 16:00. + rangeEndExclusiveMillis - (rangeEndExclusiveMillis % currentTileSize.millis) + } else { + // There's one exception. If canExceedRangeEnd is true, that means that we're at a point in the algorithm where + // we can fetch data past the end of the range an get the current, partially-complete tile. + // Example: range: [14:35, 19:30), tile size: 06:00, it's ok for us to fetch the (partially-complete) + // [18:00, 00:00) tile. So we move the end of the range to 00:00. + rangeEndExclusiveMillis - (rangeEndExclusiveMillis % currentTileSize.millis) + currentTileSize.millis + } + + // Calculate the number of tiles that can fit within the range. + // Example: range: [14:35, 16:00), tile size: 00:20, first tile start: 14:40, last tile end: 16:00 + // => number of tiles = 4. + val numTilesThatCanFitInTheRange = + ((lastPossibleTileEndAlignedWithEpoch - firstPossibleTileStartAlignedWithEpoch) / currentTileSize.millis).toInt + + // Generate the tiles that need to be fetched. + // Example: range: [14:35, 16:00), tile size: 00:20, first tile start: 14:40, last tile end: 16:00 + // => tiles = [(14:40, TileSize.TwentyMinutes), (...), (...), (15:40, TileSize.TwentyMinutes)]. + val tilesBuilder: collection.mutable.Builder[(Long, TileSize), Set[(Long, TileSize)]] = + Set.newBuilder[(Long, TileSize)] + for (i <- 0 until numTilesThatCanFitInTheRange) { + val tileStart = firstPossibleTileStartAlignedWithEpoch + i * currentTileSize.millis + tilesBuilder += ((tileStart, currentTileSize)) + } + var tiles: Set[(Long, TileSize)] = tilesBuilder.result() + + // Figure out what part of the range is not filled by the tiles on the LEFT. + // Example: range: [14:35, 16:00), tile size: 00:20, first tile start: 14:40, last tile end: 16:00 + // => unfilled range = [14:35, 14:40). + val unfilledRangeStartLeft = rangeStartInclusiveMillis + val unfilledRangeEndLeft = firstPossibleTileStartAlignedWithEpoch + val isThereUnfilledRangeOnTheLeft = unfilledRangeStartLeft < unfilledRangeEndLeft + + // Figure out what part of the range is not filled by the tiles on the RIGHT. + // Example: range: [0:30, 1:20), tile size: 00:15, first tile start: 0:30, last tile end: 1:15. + // => unfilled range = [1:15, 1:20). + val unfilledRangeStartRight = lastPossibleTileEndAlignedWithEpoch + val unfilledRangeEndRight = rangeEndExclusiveMillis + val isThereUnfilledRangeOnTheRight = unfilledRangeStartRight < unfilledRangeEndRight + + // if tile size is the smallest available, and we haven't filled the entire range, something is very wrong. + if ( + currentTileSize == tileSizesAvailableForThisChrononWindowSortedDescending.last + && (isThereUnfilledRangeOnTheLeft || isThereUnfilledRangeOnTheRight) + ) { + throw new IllegalStateException( + s"Could not fill a Chronon Window defined in a GroupBy with the tile sizes currently available from " + + s"Flink. Smallest tile size: $currentTileSize Range: [$rangeStartInclusiveMillis, $rangeEndExclusiveMillis)." + ) + } + + // If there is an unfilled range on the LEFT, we need to use smaller tiles to fill it. + if (isThereUnfilledRangeOnTheLeft) { + tiles = tiles ++ getTilesForRangeInWindow( + unfilledRangeStartLeft, + unfilledRangeEndLeft, + tileSizesAvailableForThisChrononWindowSortedDescending( + tileSizesAvailableForThisChrononWindowSortedDescending.indexOf(currentTileSize) + 1 + ), + tileSizesAvailableForThisChrononWindowSortedDescending + ) + } + + // If there is an unfilled range on the RIGHT, we need to use smaller tiles to fill it. + if (isThereUnfilledRangeOnTheRight) { + tiles = tiles ++ getTilesForRangeInWindow( + unfilledRangeStartRight, + unfilledRangeEndRight, + tileSizesAvailableForThisChrononWindowSortedDescending( + tileSizesAvailableForThisChrononWindowSortedDescending.indexOf(currentTileSize) + 1 + ), + tileSizesAvailableForThisChrononWindowSortedDescending + ) + } + + tiles + } + } +} diff --git a/online/src/main/scala/ai/chronon/online/TileSize.scala b/online/src/main/scala/ai/chronon/online/TileSize.scala new file mode 100644 index 000000000..fc3ed5ace --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/TileSize.scala @@ -0,0 +1,51 @@ +package ai.chronon.online + +/** + * Chronon data from Flink is stored in the KV store as tiles. These tiles are of + * fixed sizes and contain the intermediate results for a given time range. We keep + * track of the allowed tile sizes in this file. + * + * For more info on tiling, see https://chronon.ai/Tiled_Architecture.html. + * + * Note: Because Chronon batch jobs run at 00:00 UTC, tile sizes must + * be divisible by 24 hours. + * + * @param seconds The size of the tile in seconds + * @param millis The size of the tile in milliseconds + * @param keyString The key used to store the tile in the KV store + */ +sealed trait TileSize { + val seconds: Long // The size of the tile in seconds + val keyString: String // The string representation of the tile size, used as the part of the key stored in the KV store. + lazy val millis: Long = seconds * 1000L +} + +object TileSize { + // Get TileSize from seconds + def fromSeconds(seconds: Long): TileSize = + seconds match { + case FiveMinutes.seconds => FiveMinutes + case OneHour.seconds => OneHour + case OneDay.seconds => OneDay + case _ => + throw new IllegalArgumentException(s"Invalid tile size provided: $seconds seconds") + } + + // Get TileSize from millis + def fromMillis(millis: Long): TileSize = fromSeconds(millis / 1000L) + + final case object FiveMinutes extends TileSize { + val seconds: Long = 300L + val keyString: String = "5m" + } + + final case object OneHour extends TileSize { + val seconds: Long = 3600L + val keyString: String = "1h" + } + + final case object OneDay extends TileSize { + val seconds: Long = 86400L + val keyString: String = "1d" + } +} diff --git a/online/src/test/scala/ai/chronon/online/TileFetchingUtilsTest.scala b/online/src/test/scala/ai/chronon/online/TileFetchingUtilsTest.scala new file mode 100644 index 000000000..f31a3dff5 --- /dev/null +++ b/online/src/test/scala/ai/chronon/online/TileFetchingUtilsTest.scala @@ -0,0 +1,418 @@ +package ai.chronon.online + +import org.junit.Test +import org.mockito.Mockito.{mock, when} +import org.junit.Assert._ +import ai.chronon.aggregator.windowing.FiveMinuteResolution.getWindowResolutionMillis +import ai.chronon.api.Extensions.WindowOps +import ai.chronon.api.{TimeUnit, Window} + +class TileFetchingUtilsTest { + + @Test + def testGetSawToothWindowStartMillisForWindowGreaterThan12Days(): Unit = { + val window = new Window(12, TimeUnit.DAYS) + val windowSizeMillis = window.millis + val resolutionMillis = getWindowResolutionMillis(windowSizeMillis) + val currentTimeMillis = 1719755742000L // 2024-06-30 13:55:42.000 + + val expectedSawToothWindowStartMillis = 1718668800000L // 2024-06-18 00:00:00.000 + + assertEquals( + expectedSawToothWindowStartMillis, + TileFetchingUtils.getSawToothWindowStartMillis( + windowSizeMillis, + currentTimeMillis, + resolutionMillis + ) + ) + } + + @Test + def testGetSawToothWindowStartMillisForWindowBetween12HoursAnd12Days(): Unit = { + val window = new Window(12, TimeUnit.HOURS) + val windowSizeMillis = window.millis + val resolutionMillis = getWindowResolutionMillis(windowSizeMillis) + val currentTimeMillis = 1719755742000L // 2024-06-30 13:55:42.000 + + val expectedSawToothWindowStartMillis = 1719709200000L // 2024-06-30 01:00:00.000 + + assertEquals( + expectedSawToothWindowStartMillis, + TileFetchingUtils.getSawToothWindowStartMillis( + windowSizeMillis, + currentTimeMillis, + resolutionMillis + ) + ) + } + + @Test + def testGetSawToothWindowStartMillisForWindowLessThan12Hours(): Unit = { + val window = new Window(1, TimeUnit.HOURS) + val windowSizeMillis = window.millis + val resolutionMillis = getWindowResolutionMillis(windowSizeMillis) + val currentTimeMillis = 1719755742000L // 2024-06-30 13:55:42.000 + + val expectedSawToothWindowStartMillis = 1719752100000L // 2024-06-30 12:55:00.000 + + assertEquals( + expectedSawToothWindowStartMillis, + TileFetchingUtils.getSawToothWindowStartMillis( + windowSizeMillis, + currentTimeMillis, + resolutionMillis + ) + ) + } + @Test + def testGetTilesForWindowWithSingleWindowAndSingleTileSize(): Unit = { + val windowSizeMillis = List(2 * 60 * 60 * 1000L) // 2 hours + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706844420000L // February 2, 2024 3:27:00 + val tileSizes = Vector(TileSize.FiveMinutes) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + // range from February 2, 2024 1:25:00 to February 2, 2024 3:25:00 + val expectedTiles = (1706837100000L to 1706844300000L by TileSize.FiveMinutes.millis) + .map((_, TileSize.FiveMinutes.millis)) + .toSet + + assertEquals(expectedTiles, actualTiles) + } + + @Test + def testGetTilesForWindowWithSingleWindowAndTwoTileSizes(): Unit = { + val windowSizeMillis = List(2 * 60 * 60 * 1000L) // 2 hours + val afterTsMillis = 1706832000000L // Friday, February 2, 2024 0:00:00 + val currentTsMillis = 1706844420000L // Friday, February 2, 2024 3:27:00 + val tileSizes = Vector(TileSize.FiveMinutes, TileSize.OneHour) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + val expectedTiles = Set( + (1706837100000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:25:00 + (1706837400000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:30:00 + (1706837700000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:35:00 + (1706838000000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:40:00 + (1706838300000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:45:00 + (1706838600000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:50:00 + (1706838900000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:55:00 + (1706839200000L, TileSize.OneHour.millis), // Friday, February 2, 2024 2:00:00 + (1706842800000L, TileSize.OneHour.millis), // Friday, February 2, 2024 3:00:00 + ) + + assertEquals(expectedTiles, actualTiles) + } + + @Test + def testGetTilesForWindowWithTwoWindowsAndSingleTileSize(): Unit = { + val windowSizeMillis = List(2 * 60 * 60 * 1000L, 6 * 60 * 60 * 1000L) // 2 hours and 6 hours + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706844420000L // February 2, 2024 3:27:00 + val tileSizes = Vector(TileSize.FiveMinutes) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + // range from February 2, 2024 0:00:00 to February 2, 2024 3:25:00 + val expectedTiles = (1706832000000L to 1706844300000L by TileSize.FiveMinutes.millis) + .map((_, TileSize.FiveMinutes.millis)) + .toSet + + assertEquals(expectedTiles, actualTiles) + } + + @Test + def testGetTilesForWindowWithTwoWindowsAndTwoTileSizes(): Unit = { + val windowSizeMillis = List(2 * 60 * 60 * 1000L, 6 * 60 * 60 * 1000L) // 2 hours and 6 hours + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706844420000L // February 2, 2024 3:27:00 + val tileSizes = Vector(TileSize.FiveMinutes, TileSize.OneHour) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + val expectedTiles = Set( + (1706832000000L, TileSize.OneHour.millis), // Friday, February 2, 2024 0:00:00 + (1706835600000L, TileSize.OneHour.millis), // Friday, February 2, 2024 1:00:00 + (1706837100000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:25:00 + (1706837400000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:30:00 + (1706837700000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:35:00 + (1706838000000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:40:00 + (1706838300000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:45:00 + (1706838600000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:50:00 + (1706838900000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:55:00 + (1706839200000L, TileSize.OneHour.millis), // Friday, February 2, 2024 2:00:00 + (1706842800000L, TileSize.OneHour.millis), // Friday, February 2, 2024 3:00:00 + ) + + assertEquals(expectedTiles, actualTiles) + } + + @Test + def testGetTilesForWindowWithLongWindow(): Unit = { + val windowSizeMillis = List(12 * 24 * 60 * 60 * 1000L) // 12 days + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706844420000L // February 2, 2024 3:27:00 + val tileSizes = Vector( + TileSize.FiveMinutes, + TileSize.OneHour, + TileSize.OneDay + ) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + val expectedTiles = Set( + (1706832000000L, TileSize.OneDay.millis) // Friday, February 2, 2024 0:00:00 + ) + + assertEquals(expectedTiles, actualTiles) + } + + @Test + def testGetTilesForWindowWithShortWindow(): Unit = { + val windowSizeMillis = List(10 * 60 * 1000L) // 10 min + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706844420000L // February 2, 2024 3:27:00 + val tileSizes = Vector( + TileSize.FiveMinutes, + TileSize.OneHour, + TileSize.OneDay + ) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + val expectedTiles = Set( + (1706843700000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 3:15:00 + (1706844000000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 3:20:00 + (1706844300000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 3:25:00 + ) + + assertEquals(expectedTiles, actualTiles) + } + + @Test + def testGetTilesForWindowWithWindowStartAfterAfterTsSeconds(): Unit = { + val windowSizeMillis = List(1 * 60 * 60 * 1000L) // 1 hour + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706844420000L // February 2, 2024 3:27:00 + val tileSizes = Vector( + TileSize.FiveMinutes, + TileSize.OneHour, + TileSize.OneDay + ) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + val expectedTiles = Set( + (1706840700000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:25:00 + (1706841000000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:30:00 + (1706841300000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:35:00 + (1706841600000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:40:00 + (1706841900000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:45:00 + (1706842200000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:50:00 + (1706842500000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:55:00 + (1706842800000L, TileSize.OneHour.millis) // Friday, February 2, 2024 3:00:00 + ) + + assertEquals(expectedTiles, actualTiles) + } + + @Test + def testGetTilesForWindowWithRangeToFillOnRight(): Unit = { + val windowSizeMillis = List(2 * 60 * 60 * 1000L) // 2 hours + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706843400000L // February 2, 2024 3:10:00 + + // Custom tile sizes to force range filling on the right + val customEightyMinuteTileSize = mock(classOf[TileSize]) + when(customEightyMinuteTileSize.seconds).thenReturn(80 * 60) + when(customEightyMinuteTileSize.millis).thenReturn(80 * 60 * 1000) + when(customEightyMinuteTileSize.keyString).thenReturn("80m") + + val customThreeHourTileSize = mock(classOf[TileSize]) + when(customThreeHourTileSize.seconds).thenReturn(3 * 60 * 60) + when(customThreeHourTileSize.millis).thenReturn(3 * 60 * 60 * 1000) + when(customThreeHourTileSize.keyString).thenReturn("3h") + + val tileSizes = Vector( + customEightyMinuteTileSize, + customThreeHourTileSize, + TileSize.FiveMinutes + ) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + val expectedTiles = Set( + (1706842800000L, customThreeHourTileSize.millis), // Friday, February 2, 2024 3:00:00 + (1706836800000L, customEightyMinuteTileSize.millis), // Friday, February 2, 2024 3:05:00 + (1706836200000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:10:00 + (1706836500000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 1:15:00 + (1706841600000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:40:00 + (1706841900000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:45:00 + (1706842200000L, TileSize.FiveMinutes.millis), // Friday, February 2, 2024 2:50:00 + (1706842500000L, TileSize.FiveMinutes.millis) // Friday, February 2, 2024 2:55:00 + ) + + assertEquals(expectedTiles, actualTiles) + } + @Test + def testGetTilesForWindowWhenAfterTsMillisEqualsCurrentTimeMillis(): Unit = { + val windowSizeMillis = List(20 * 60 * 1000L, 2 * 60 * 60 * 1000L, 6 * 60 * 60 * 1000L) + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val tileSizes = Vector(TileSize.FiveMinutes) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + val expectedTiles = Set( + (1706832000000L, TileSize.FiveMinutes.millis) // Friday, February 2, 2024 0:00:00 + ) + + assertEquals(expectedTiles, actualTiles) + } + + @Test(expected = classOf[IllegalStateException]) + def testGetTilesForWindowWhenCurrentTimeMillisBeforeAfterTsMillis(): Unit = { + val windowSizeMillis = List(60 * 60 * 1000L) + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706831400000L // February 1, 2024 23:50:00 + val tileSizes = Vector(TileSize.OneHour, TileSize.FiveMinutes) + + TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + } + + @Test + def testGetTilesForWindowWhenCurrentTimeMillisExactlyEqualsNextTileStart(): Unit = { + val windowSizeMillis = List(60 * 60 * 1000L) + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val currentTsMillis = 1706835600000L // February 2, 2024 1:00:00 + val tileSizes = Vector(TileSize.OneHour, TileSize.FiveMinutes) + + val actualTiles = TileFetchingUtils.getTilesForWindows( + windowSizeMillis, + afterTsMillis, + currentTsMillis, + tileSizes + ) + + val expectedTiles = Set( + (1706832000000L, TileSize.OneHour.millis), // Friday, February 2, 2024 0:00:00 + (1706835600000L, TileSize.OneHour.millis) // Friday, February 2, 2024 1:00:00 + ) + + assertEquals(expectedTiles, actualTiles) + } + + /** + * Helper function to assert there are no gaps between tiles. + * Tests the tile fetching algorithm at different timestamps and verifies tiles returned never have gaps. + */ + private def assertThereAreNoGapsBetweenTiles( + windowSizesMillis: List[Long], + afterTsMillis: Long, + tileSizesAvailable: IndexedSeq[TileSize] + ): Unit = { + // Test all window sizes separately + windowSizesMillis.foreach { selectedWindowSizeMillis => + // Iterate through every 1-minute timestamp from February 2, 2024 0:01:00 to February 3, 2024 23:59:00 + (afterTsMillis + 60 * 1000 until afterTsMillis + 60 * 1000 + 60 * 60 * 24 * 2 * 1000 by 60 * 1000) + .map { currentTsMillis => + // Add jitter, check the ms before the minute, the exact minute itself, and the ms after + (-1 to 1).map { j => + val currentTsMillisWithJitter = currentTsMillis + j + val sortedTiles = TileFetchingUtils + .getTilesForWindows( + Seq(selectedWindowSizeMillis), + afterTsMillis, + currentTsMillisWithJitter, + tileSizesAvailable + ) + .toSeq + .sortBy(_._1) + + // First tile is at or after February 2, 2024 0:00:00 + assertTrue(sortedTiles.head._1 >= afterTsMillis) + + // Last tile is at or after current time + assertTrue(sortedTiles.last._1 + sortedTiles.last._2 >= currentTsMillisWithJitter) + + // Check if there are no gaps between tiles + if (sortedTiles.size > 1) { + sortedTiles.sliding(2).foreach { pair => + // First time start + first tile size = second time start + assertEquals(pair.head._1 + pair.head._2, pair.last._1) + } + } + } + } + } + } + + @Test + def testNoGapsBetweenTilesWithOneHourAndFiveMinuteTiles(): Unit = { + val windowSizeMillis = List( + 10 * 60 * 1000L, + 60 * 60 * 1000L, + 24 * 60 * 60 * 1000L, + 7 * 24 * 60 * 60 * 1000L + ) + val afterTsMillis = 1706832000000L // February 2, 2024 0:00:00 + val tileSizes = Vector( + TileSize.FiveMinutes, + TileSize.OneHour + ) + + assertThereAreNoGapsBetweenTiles(windowSizeMillis, afterTsMillis, tileSizes) + } +} diff --git a/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala b/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala index 4aa7c9db1..6d9c934fd 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/InMemoryKvStore.scala @@ -54,7 +54,7 @@ class InMemoryKvStore(tableUtils: () => TableUtils) extends KVStore with Seriali .filter { case (version, _) => req.afterTsMillis.forall(version >= _) } // filter version - .map { case (version, bytes) => TimedValue(bytes, version) } + .map { case (version, bytes) => TimedValue(bytes, version, Some(5 * 60 * 1000)) } } KVStore.GetResponse(req, values) }