Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #479: Error Control for Non-Deterministic Source Queries #512

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
16 changes: 16 additions & 0 deletions core/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ trait OTreeDataAnalyzer {
*/
object DoublePassOTreeDataAnalyzer
extends OTreeDataAnalyzer
with SparkPlanAnalyzer
with SparkRevisionChangesUtils
with Serializable
with Logging {
Expand Down Expand Up @@ -216,6 +217,21 @@ object DoublePassOTreeDataAnalyzer
indexStatus: IndexStatus,
options: QbeastOptions): (DataFrame, TableChanges) = {
logTrace(s"Begin: Analyzing the input data with existing revision: ${indexStatus.revision}")

// Check if the DataFrame is deterministic
logDebug(s"Checking the determinism of the input data")
val isSourceDeterministic =
analyzeDataFrameDeterminism(dataFrame, indexStatus.revision)
// TODO: we need to add columnStats control before the assert
// TODO: Otherwise, the write would fail even if the user adds the correct configuration
assert(
isSourceDeterministic,
s"The source query is non-deterministic. " +
s"Due to Qbeast-Spark write nature, we load the DataFrame twice before writing to storage." +
s"It is required to have deterministic sources and deterministic columns to index " +
s"to preserve the state of the indexing pipeline. " +
s"If it is not the case, please save the DF as delta and Convert it To Qbeast in a second step")

// Compute the changes in the space: cube size, transformers, and transformations.
val (revisionChanges, numElements) =
computeRevisionChanges(indexStatus.revision, options, dataFrame)
Expand Down
117 changes: 117 additions & 0 deletions core/src/main/scala/io/qbeast/spark/index/SparkPlanAnalyzer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.qbeast.spark.index

import io.qbeast.core.model.Revision
import io.qbeast.spark.internal.rules.QbeastRelation
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LocalLimit
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.Sample
import org.apache.spark.sql.catalyst.plans.logical.Sort
import org.apache.spark.sql.DataFrame

import scala.collection.convert.ImplicitConversions.`collection asJava`

trait SparkPlanAnalyzer {

/**
* Builds a list of non-deterministic operations in the logical plan
*
* A list of Non-Deterministic operations:
* - LocalLimit without Sort Operation
* - Sample
* - Filter with non-deterministic condition
*
* @param plan
* the logical plan
* @return
*/
private def isLogicalPlanDeterministic(plan: LogicalPlan): Boolean = {
// Recursively traverse the logical plan to find non-deterministic operations
val isCurrentOperationDeterministic = plan match {
case LocalLimit(_, _: Sort) => true // LocalLimit with Sort is deterministic
case LocalLimit(_, _) => false
case Sample(_, _, false, _, child) =>
child match {
case QbeastRelation(_, _) =>
true // Sample over QbeastRelation is deterministic
case _ => false
}
case Filter(condition, _) => condition.deterministic
case _ => true
}

val areChildOperationsDeterministic = plan.children.forall(isLogicalPlanDeterministic)

isCurrentOperationDeterministic && areChildOperationsDeterministic
}

/**
* Extracts the select expressions from the logical plan for the provided column name
*
* @param logicalPlan
* the logical plan
* @param columnName
* the column name
* @return
*/
private[index] def collectSelectExpressions(
logicalPlan: LogicalPlan,
columnName: String): Seq[Expression] = {
logicalPlan match {
case Project(projectList, _) =>
projectList.collect {
case Alias(child, name) if name == columnName => child
case expr if expr.references.map(_.name).contains(columnName) => expr
}
case _ =>
logicalPlan.children.flatMap(child => collectSelectExpressions(child, columnName))
}
}

/**
* Checks if a column is deterministic in the logical plan
* @param logicalPlan
* the logical plan
* @param columnName
* the column name
* @return
*/

private def isColumnDeterministic(logicalPlan: LogicalPlan, columnName: String): Boolean = {
val expressionSet = collectSelectExpressions(logicalPlan, columnName)
expressionSet.forall(_.deterministic)
}

/**
* Analyzes the DataFrame to determine if it's execution is safely deterministic for indexing
*
* - The logical plan of the DataFrame is checked for determinism
* - The columns to index are checked for determinism
*
* @param dataFrame
* the DataFrame to analyze
* @param revision
* the Revision to analyze
* @return
*/
def analyzeDataFrameDeterminism(dataFrame: DataFrame, revision: Revision): Boolean = {
// Access the logical plan of the DataFrame
val logicalPlan: LogicalPlan = dataFrame.queryExecution.logical

// Check if the logical plan's query is deterministic
// Detect if the DataFrame's operations are deterministic
val isQueryDeterministic: Boolean = isLogicalPlanDeterministic(logicalPlan)

// Check if any of the columns to index in the DataFrame is deterministic
val columnsToIndex = revision.columnTransformers.map(_.columnName)
val areColumnsToIndexDeterministic: Boolean =
columnsToIndex.forall(column => isColumnDeterministic(logicalPlan, column))

// Check if the source is deterministic
isQueryDeterministic && areColumnsToIndexDeterministic
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.qbeast.spark.internal.rules

import io.qbeast.core.model.QbeastOptions
import io.qbeast.spark.index.DefaultFileIndex
import io.qbeast.IndexedColumns
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation

/**
* QbeastRelation matching pattern
*/
object QbeastRelation {

def unapply(plan: LogicalPlan): Option[(LogicalRelation, IndexedColumns)] = plan match {

case l @ LogicalRelation(
q @ HadoopFsRelation(o: DefaultFileIndex, _, _, _, _, parameters),
_,
_,
_) =>
val qbeastOptions = QbeastOptions(parameters)
val columnsToIndex = qbeastOptions.columnsToIndexParsed.map(_.columnName)
Some((l, columnsToIndex))
case _ => None
}

}
24 changes: 1 addition & 23 deletions src/main/scala/io/qbeast/internal/rules/SampleRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
*/
package io.qbeast.internal.rules

import io.qbeast.core.model.QbeastOptions
import io.qbeast.core.model.Weight
import io.qbeast.core.model.WeightRange
import io.qbeast.spark.index.DefaultFileIndex
import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash
import io.qbeast.spark.internal.rules.QbeastRelation
import io.qbeast.IndexedColumns
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.And
Expand All @@ -31,7 +30,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.Sample
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.SparkSession

Expand Down Expand Up @@ -107,23 +105,3 @@ class SampleRule(spark: SparkSession) extends Rule[LogicalPlan] with Logging {
}

}

/**
* QbeastRelation matching pattern
*/
object QbeastRelation {

def unapply(plan: LogicalPlan): Option[(LogicalRelation, IndexedColumns)] = plan match {

case l @ LogicalRelation(
q @ HadoopFsRelation(o: DefaultFileIndex, _, _, _, _, parameters),
_,
_,
_) =>
val qbeastOptions = QbeastOptions(parameters)
val columnsToIndex = qbeastOptions.columnsToIndexParsed.map(_.columnName)
Some((l, columnsToIndex))
case _ => None
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ class ConvertToQbeastDeltaTest

it should "not create new revisions for a qbeast table" in withSparkAndTmpDir(
(spark, tmpDir) => {
loadTestData(spark)
.limit(dataSize)
.write
loadTestData(spark).write
.format("qbeast")
.option("columnsToIndex", columnsToIndex.mkString(","))
.option("cubeSize", dcs)
Expand Down Expand Up @@ -189,9 +187,7 @@ class ConvertToQbeastDeltaTest
convertFromFormat(spark, "parquet", tmpDir)

// Append qbeast data
loadTestData(spark)
.limit(dataSize)
.write
loadTestData(spark).write
.mode("append")
.format("qbeast")
.save(tmpDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ class QbeastSnapshotDeltaTest extends QbeastIntegrationTestSpec {
"Appends" should "only update metadata when needed" in
withQbeastContextSparkAndTmpDir { (spark, tmpDir) =>
{
val df = loadTestData(spark).limit(5000)
val df = createDF(100)
df.write
.format("qbeast")
.option("columnsToIndex", "user_id,price")
.option("columnsToIndex", "age,val2")
.save(tmpDir)
df.write.mode("append").format("qbeast").save(tmpDir)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,11 @@ class TransformerIndexingTest

it should "be able to change Transformer type at will" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
val data = loadTestData(spark).sample(0.1)
val dataFirst = spark.range(1).toDF("price")
val data = spark.range(2, 10).toDF("price")

// Creates a LinearTransformer with IdentityTransformation
data
.limit(1)
.write
dataFirst.write
.mode("append")
.format("qbeast")
.option("columnsToIndex", "price")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import io.qbeast.TestUtils._
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.functions.avg
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.rand
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.regexp_replace
import org.apache.spark.sql.functions.when

class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {

Expand Down Expand Up @@ -133,18 +132,20 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
"for null value columns" in withSparkAndTmpDir { (spark, tmpDir) =>
{
val data = loadTestData(spark)
val dataWithNulls =
data.withColumn(
"null_product_id",
when(rand() > 0.5, null).otherwise(col("product_id")))
val dataWithoutNulls =
data.filter("product_id < 4000000").withColumn("null_product_id", col("product_id"))
val dataWithNulls = data
.filter("product_id >= 4000000")
.withColumn("null_product_id", lit(null))
val dataToIndex = dataWithoutNulls.union(dataWithNulls)

writeTestData(dataWithNulls, Seq("brand", "null_product_id"), 10000, tmpDir)
writeTestData(dataToIndex, Seq("brand", "null_product_id"), 10000, tmpDir)

val df = spark.read.format("qbeast").load(tmpDir)
val filter = "(`null_product_id` is null)"

val qbeastQuery = df.filter(filter)
val normalQuery = dataWithNulls.filter(filter)
val normalQuery = dataToIndex.filter(filter)

checkFiltersArePushedDown(qbeastQuery)
qbeastQuery.count() shouldBe normalQuery.count()
Expand Down
81 changes: 81 additions & 0 deletions src/test/scala/io/qbeast/spark/utils/QbeastInputSourcesTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.qbeast.spark.utils

import io.qbeast.QbeastIntegrationTestSpec

class QbeastInputSourcesTest extends QbeastIntegrationTestSpec {

private val nonDeterministicColumns =
Seq("rand()", "uuid()")

"Qbeast" should "throw an error when indexing non-deterministic query columns" in withSparkAndTmpDir {
(spark, tmpDir) =>
nonDeterministicColumns.foreach(column => {
val location = tmpDir + "/" + column
val df = spark
.range(10)
.withColumn("non_deterministic_col", org.apache.spark.sql.functions.expr(column))
val e = intercept[AssertionError] {
df.write
.format("qbeast")
.option("columnsToIndex", "non_deterministic_col")
.save(location)
}
assert(e.getMessage.contains("assertion failed: The source query is non-deterministic."))
})
}

it should "allow to write non-deterministic columns when they are not being indexed" in withSparkAndTmpDir {
(spark, tmpDir) =>
nonDeterministicColumns.foreach(column => {
val location = tmpDir + "/" + column
val df = spark
.range(10)
.toDF("id")
.withColumn("non_deterministic_col", org.apache.spark.sql.functions.expr(column))
df.write.format("qbeast").option("columnsToIndex", "id").save(location)
assertSmallDatasetEquality(
df,
spark.read.format("qbeast").load(location),
ignoreNullable = true)
})
}

it should "throw an error when indexing non-deterministic LIMIT" in withSparkAndTmpDir {
(spark, tmpDir) =>
// Index non-deterministic columns with LIMIT
val df = spark.range(10).toDF("id")
val e = intercept[AssertionError] {
df.limit(5).write.format("qbeast").option("columnsToIndex", "id").save(tmpDir)
}
assert(e.getMessage.contains("assertion failed: The source query is non-deterministic."))
}

it should "throw an error when indexing non-deterministic SAMPLE" in withSparkAndTmpDir {
(spark, tmpDir) =>
// Index non-deterministic columns with SAMPLE
val df = spark.range(10).toDF("id")
val e = intercept[AssertionError] {
df.sample(0.5).write.format("qbeast").option("columnsToIndex", "id").save(tmpDir)
}
assert(e.getMessage.contains("assertion failed: The source query is non-deterministic."))
}

it should "throw an error with undeterministic filter query" in withSparkAndTmpDir {
(spark, tmpDir) =>
val df = spark.range(10).toDF("id")
val e = intercept[AssertionError] {
df.filter("rand() > 0.5")
.write
.format("qbeast")
.option("columnsToIndex", "id")
.save(tmpDir)
}
assert(e.getMessage.contains("assertion failed: The source query is non-deterministic."))
}

it should "allow indexing deterministic filters" in withSparkAndTmpDir { (spark, tmpDir) =>
val df = spark.range(10).toDF("id")
df.filter("id > 5").write.format("qbeast").option("columnsToIndex", "id").save(tmpDir)
}

}
Loading
Loading