Skip to content

[WIP] Tile Layering #920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,22 @@ trait Resolution extends Serializable {
}

object FiveMinuteResolution extends Resolution {
def calculateTailHop(window: Window): Long =
window.millis match {
case x if x >= new Window(12, TimeUnit.DAYS).millis => WindowUtils.Day.millis
case x if x >= new Window(12, TimeUnit.HOURS).millis => WindowUtils.Hour.millis
case _ => WindowUtils.FiveMinutes
private val millisIn12Days = new Window(12, TimeUnit.DAYS).millis
private val millisIn12Hours = new Window(12, TimeUnit.HOURS).millis

/**
* Given a GroupBy Window size, calculate the resolution of the window as defined by Chronon.
*/
def getWindowResolutionMillis(windowSizeMillis: Long): Long =
windowSizeMillis match {
case x if x >= millisIn12Days => WindowUtils.Day.millis
case x if x >= millisIn12Hours => WindowUtils.Hour.millis
case _ => WindowUtils.FiveMinutes
}

def calculateTailHop(window: Window): Long =
getWindowResolutionMillis(window.millis)

val hopSizes: Array[Long] =
Array(WindowUtils.Day.millis, WindowUtils.Hour.millis, WindowUtils.FiveMinutes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,84 @@ class SawtoothMutationAggregator(aggregations: Seq[Aggregation],
}
}

def updateIrTiledWithTileLayering(ir: Array[Any],
headStreamingTiledIrs: Seq[TiledIr],
queryTs: Long,
batchEndTs: Long) = {
// This works as a sliding window over the sorted list of tiles, where the sort is done by
// timestamp in ascending fashion (oldest -> newest tiles), and then secondarily done by
// tile size. So for example this becomes:
// [ [12:00, 1:00), [12:00, 12:05), [12:05, 12:10), ... , [12:55, 1:00), [1:00, 2:00), [1:00, 1:05), ... ]
// And when a tile is selected, we increment our index pointer to the first tile where the start is greater
// than or equal to the end of the tile we just selected. So if we select the [12:00, 1:00) tile, then the pointer
// will move to the [1:00, 2:00) tile.

// fill 2D array where each entry corresponds to the windowedAggregator column aggregators, and contains an array of
// the streaming tiles we need to aggregate
// ex) mergeBuffers = [ [1,2,3], [2,4] ]
// windowedAggregators.columnAggregators = [ Sum, Average ]
// resultIr = [ 6, 3 ]

val sortedStreamingTiles = headStreamingTiledIrs.sortBy { tile => (tile.ts, -tile.size) }
val mergeBuffers = Array.fill(windowedAggregator.length)(mutable.ArrayBuffer.empty[Any])

var i: Int = 0
while (i < windowedAggregator.length) {
val window = windowMappings(i).aggregationPart.window
// tailHopIndices is the list of indices for the hopSizes array for which hopSize a given aggregator is using.
// I.e., At tailHopIndices(i) we will see a number that is the index in hopSizes for the aggregator's hop size.
// For example, if we are using a Five Minute Resolution window then for this instance of SawtoothAggregator,
// the hopSizes(tailHopIndices(i)), where i = 0 is one day, i = 1 is one hour, and i = 2 is five minutes.
// In practice, when we have a window with 5 minute tile sizes, then this is always 5 minutes.
// If we have a window with 1 hour tile sizes, then this is always 1 hour. And if we have a window with 1 day
// tile sizes, then this is always 1 day.
// However, we still need to use this hop size instead of the tile size because we want to correctly exclude 1 hour tiles
// that we may have in addition to 5 minute tiles.
// For example, if we have a window with 5 minute resolution that starts at 12:06pm, then when considering
// the 5 minute tiles, we round down to 12:05pm and will correctly include the 12:05pm 5 minute tile. And when
// considering the 1 hour tiles, we round down to 12:05pm and will correctly exclude the 12:00pm 1 hour tile if it exists.
val hopIndex = tailHopIndices(i)

var tilePtr: Int = 0
while (tilePtr < sortedStreamingTiles.length) {
val tile = sortedStreamingTiles(tilePtr)
val tileTs = tile.ts

if (tileTs >= batchEndTs) {
// If the tile's timestamp is after the start of the window rounded to the nearest tile border (e.g., five minutes, one hour, one day),
// and if the tile's timestamp is before the query timestamp, we should include this tile in this aggregator.
// Or if the aggregator has no window (infinite), we should include this tile.
val irInWindow = (tileTs >= TsUtils.round(queryTs - window.millis, hopSizes(hopIndex)) && tileTs < queryTs)
if (window == null || irInWindow) {
mergeBuffers(i) += tile.ir(i)

// Adjust tilePtr to next window. E.g., if we just selected a tile covering [12:00, 1:00), the next tile we
// select will start at 1:00 and we will not consider any other tiles between [12:00, 1:00).
while (tilePtr < sortedStreamingTiles.length && sortedStreamingTiles(tilePtr).ts < (tileTs + tile.size)) {
tilePtr += 1
}
} else {
// If the current tile was not in the window for the aggregator, just continue to the next tile.
tilePtr += 1
}
} else {
// If the current tile came before batchEndTs, just continue to the next tile.
tilePtr += 1
}
}

i += 1
}

var idx: Int = 0
while (idx < mergeBuffers.length) {
// include collapsed batchIr in bulkMerge computation
mergeBuffers(idx) += ir(idx)
ir(idx) = windowedAggregator(idx).bulkMerge(mergeBuffers(idx).iterator)
idx += 1
}
}

/**
* Update the intermediate results with tail hops data from a FinalBatchIr.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import ai.chronon.api.Extensions.{AggregationPartOps, WindowOps}
import ai.chronon.api._

// Wrapper class for handling Irs in the tiled chronon use case
case class TiledIr(ts: Long, ir: Array[Any])
// ts: timestamp in millis
// ir: IR of the streaming data for the tile
// size: size in millis of the time range for the IR
// Based on the above, tile start = ts, tile end = ts + size
case class TiledIr(ts: Long, ir: Array[Any], size: Long)

// batchEndTs = upload time of the batch data as derived from GroupByServingInfo & Cached
// cache = Jul-22 / latest = Jul-23, streaming data = 22 - now (filter < jul 23)
Expand Down Expand Up @@ -126,12 +130,13 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,
}

def lambdaAggregateIrTiled(finalBatchIr: FinalBatchIr,
streamingTiledIrs: Iterator[TiledIr],
queryTs: Long): Array[Any] = {
streamingTiledIrs: Seq[TiledIr],
queryTs: Long,
useTileLayering: Boolean): Array[Any] = {
// null handling
if (finalBatchIr == null && streamingTiledIrs == null) return null
val batchIr = Option(finalBatchIr).getOrElse(normalizeBatchIr(init))
val tiledIrs = Option(streamingTiledIrs).getOrElse(Array.empty[TiledIr].iterator)
val headStreamingTiledIrs = Option(streamingTiledIrs).getOrElse(Seq.empty[TiledIr])

if (batchEndTs > queryTs) {
throw new IllegalArgumentException(s"Request time of $queryTs is less than batch time $batchEndTs")
Expand All @@ -140,14 +145,19 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,
// initialize with collapsed
val resultIr = windowedAggregator.clone(batchIr.collapsed)

// add streaming tiled irs
while (tiledIrs.hasNext) {
val tiledIr = tiledIrs.next()
val tiledIrTs = tiledIr.ts // unbox long only once
if (queryTs > tiledIrTs && tiledIrTs >= batchEndTs) {
updateIrTiled(resultIr, tiledIr, queryTs)
// add head events
if (useTileLayering) {
updateIrTiledWithTileLayering(resultIr, headStreamingTiledIrs, queryTs, batchEndTs)
} else {
// add streaming tiled irs
headStreamingTiledIrs.foreach { tiledIr =>
val tiledIrTs = tiledIr.ts // unbox long only once
if (queryTs > tiledIrTs && tiledIrTs >= batchEndTs) {
updateIrTiled(resultIr, tiledIr, queryTs)
}
}
}

mergeTailHops(resultIr, queryTs, batchEndTs, batchIr)
resultIr
}
Expand All @@ -161,10 +171,11 @@ class SawtoothOnlineAggregator(val batchEndTs: Long,
}

def lambdaAggregateFinalizedTiled(finalBatchIr: FinalBatchIr,
streamingTiledIrs: Iterator[TiledIr],
ts: Long): Array[Any] = {
streamingTiledIrs: Seq[TiledIr],
ts: Long,
useTileLayering: Boolean): Array[Any] = {
// TODO: Add support for mutations / hasReversal to the tiled implementation
windowedAggregator.finalize(lambdaAggregateIrTiled(finalBatchIr, streamingTiledIrs, ts))
windowedAggregator.finalize(lambdaAggregateIrTiled(finalBatchIr, streamingTiledIrs, ts, useTileLayering))
}

}
Loading