Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #455: Next in line Rollup (rebased) #514

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions core/src/main/scala/io/qbeast/spark/writer/Rollup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ private[writer] class Rollup(limit: Double) {
* the rollup result
*/
def compute(): Map[CubeId, CubeId] = {
val queue = new mutable.PriorityQueue()(Ordering.by[CubeId, Int](_.depth))
val queue = new mutable.PriorityQueue()(CubeIdOrdering)
groups.keys.foreach(queue.enqueue(_))
while (queue.nonEmpty) {
val cubeId = queue.dequeue()
val group = groups(cubeId)
if (group.size < limit && !cubeId.isRoot) {
val Some(parentCubeId) = cubeId.parent
if (groups.contains(parentCubeId)) {
groups(parentCubeId).add(group)
val nextInLine = queue.headOption match {
case Some(cube) if areSiblings(cube, cubeId) => cube
case _ => cubeId.parent.get
}
if (groups.contains(nextInLine)) {
groups(nextInLine).add(group)
} else {
groups.put(parentCubeId, group)
queue.enqueue(parentCubeId)
groups.put(nextInLine, group)
queue.enqueue(nextInLine)
}
groups.remove(cubeId)
}
Expand All @@ -75,6 +78,37 @@ private[writer] class Rollup(limit: Double) {
}.toMap
}

private def areSiblings(cube_a: CubeId, cube_b: CubeId): Boolean = {
val sameParent = cube_a.parent == cube_b.parent
val differentCube = cube_a != cube_b
Jiaweihu08 marked this conversation as resolved.
Show resolved Hide resolved
sameParent && differentCube
}

/*
* Ordering for cube identifiers. The cube identifiers are ordered by their depth in ascending
* order. If the depth is the same then the cube identifiers are ordered by in reverse order.
* This ordering is used in the priority queue to process the cube identifiers in the correct
* order, i.e., from the deepest to the shallowest, and from the leftmost to the rightmost:
* 0 root
* 1 c0 c1
* 2 c00 c01 c10 c11
* The priority queue will process the cube identifiers in the following order:
* c00, c01, c10, c11, c0, c1, root.
* c00 -> c01 -> c0, c10 -> c11 -> c1, c0 -> c1 -> root
*/
private object CubeIdOrdering extends Ordering[CubeId] {

override def compare(x: CubeId, y: CubeId): Int = {
Jiaweihu08 marked this conversation as resolved.
Show resolved Hide resolved
val depthComparison = x.depth.compareTo(y.depth)
if (depthComparison == 0) {
y.compare(x)
} else {
depthComparison
}
}

}

private class Group(val cubeIds: mutable.Set[CubeId], var size: Long) {

def add(cubeId: CubeId, size: Long): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec {
}

it should "compute rollup correctly when optimizing" in
withSparkAndTmpDir { (spark, tmpDir) =>
withSparkAndTmpDir { (_, tmpDir) =>
val revision =
Revision(1L, 0, QTableID(tmpDir), 20, Vector(EmptyTransformer("col_1")), Vector.empty)

Expand All @@ -86,7 +86,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec {
Map(root -> 20L, c1 -> 1L, c2 -> 20L))

val rollup = DeltaRollupDataWriter.computeRollup(tc)
rollup shouldBe Map(root -> root, c1 -> root, c2 -> c2)
rollup shouldBe Map(root -> root, c1 -> c2, c2 -> c2)
}

}
66 changes: 50 additions & 16 deletions src/test/scala/io/qbeast/spark/writer/RollupTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,62 @@ import org.scalatest.matchers.should.Matchers
*/
class RollupTest extends AnyFlatSpec with Matchers {

"Rollup" should "work correctly" in {
val root = CubeId.root(1)
"Rollup" should "work correctly with basic cube structure" in {
// root(100)
// |
// c0(100)
// / | \ \
// c00(50) c01(50) c02(50) c03(50)
val root = CubeId.root(2)
val c0 = root.firstChild
val c1 = c0.nextSibling.get
val c00 = c0.firstChild
val c01 = c00.nextSibling.get
val c10 = c1.firstChild
val c11 = c10.nextSibling.get
val Seq(c00, c01, c02, c03) = c0.children.toSeq

val rollup = new Rollup(100)
rollup.populate(root, 100)
rollup.populate(c0, 100)
rollup.populate(c00, 50)
rollup.populate(c01, 50)
rollup.populate(c02, 50)
rollup.populate(c03, 50)

rollup.compute() shouldBe Map(
root -> root,
c0 -> c0,
c00 -> c01,
c01 -> c01,
c02 -> c03,
c03 -> c03)
}

it should "rollup a cube up to the parent after checking all sibling cubes" in {
// root(100)
// / | \ \
// c0(20) c1(20) c2(20) c3(20)
val root = CubeId.root(2)
val Seq(c0, c1, c2, c3) = root.children.toSeq

val rollup = new Rollup(100)
rollup.populate(root, 100)
rollup.populate(c0, 20)
rollup.populate(c1, 20)
rollup.populate(c2, 20)
rollup.populate(c3, 20)

rollup.compute() shouldBe Map(root -> root, c0 -> root, c1 -> root, c2 -> root, c3 -> root)
}

it should "handle empty rollup" in {
val result = new Rollup(3).compute()
result shouldBe empty
}

it should "handle single cube" in {
val root = CubeId.root(1)
val result = new Rollup(3)
.populate(root, 1)
.populate(c00, 1)
.populate(c01, 2)
.populate(c10, 2)
.populate(c11, 3)
.populate(root, 2)
.compute()

result(root) shouldBe root
result(c00) shouldBe c0
result(c01) shouldBe c0
result(c10) shouldBe root
result(c11) shouldBe c11
}

}
Loading