Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #479: Error Control for Non-Deterministic Source Queries #512

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

osopardo1
Copy link
Member

@osopardo1 osopardo1 commented Dec 11, 2024

Description

Problem

An issue identified one month ago (#414 ) revealed a significant limitation in the implementation of the Qbeast writing process for Spark. The current approach requires multiple traversals over the DataFrame, including the following steps:

  1. Collecting statistics from the data: Calculating metrics such as min/max values for the columns to index and the count of elements.
  2. Estimating the index: Determining how the data should be indexed.
  3. Indexing rows and compacting data: Assigning index values to each row and organizing the information into files.

This repeated loading of the DataFrame introduces potential inconsistencies because both the source data and the query results can change between these steps:

  • Source Modifications: Concurrent changes to the underlying data source.
  • Non-Deterministic Queries: Queries that produce different results across executions.

If either of these situations occurs, discrepancies between the statistics computed in step 1 and the final min/max values used in step 2 can lead to mismatches in the indexing process.

Proposed Solution

To address part of this issue, we propose introducing a new agent, the SparkPlanAnalyzer, to improve error handling for non-deterministic queries before processing the data.

Non-Deterministic Queries Unsupported

To properly define the types of Queries that aren't supported anymore, let's list the Non-Deterministic query plans that Spark can encounter:

  • LIMIT -> Applying twice a LIMIT clause (if the Source is not SORTED), would lead to different results.
  • SAMPLE -> The Spark Sample Command uses Random Sampling to extract the percentage of rows specified in the operation. Unless is a Qbeast Table, we cannot ensure the determinism of the results.
  • FILTERING with a Non-Deterministic Column -> Using rand() or other types of non-deterministic predicates would lead to different results depending on the execution.
  • Indexing a Non-Deterministic Column -> Calculating statistics over a non-deterministic column would provoke a mismatch between Transformations computed in Step 1 and CubeID creation made in Step 2. No error should be raised if the non-deterministic columns are not marked for indexing.

User Workaround

This approach provides users with two options:

  • Modify the query: Ensure that the query is deterministic to avoid inconsistencies.
  • Add columnStats using the .option method: By providing column statistics directly, users can mitigate mismatches between steps 1 and 2. However, this does not guarantee that the final written values will match those produced by the initial query.

Type of change

New feature.

Checklist:

Here is the list of things you should do before submitting this pull request:

  • New feature / bug fix has been committed following the Contribution guide.
  • Add logging to the code following the Contribution guide.
  • Add comments to the code (make it easier for the community!).
  • Change the documentation.
  • Add tests.
  • Your branch is updated to the main branch (dependent changes have been merged).

How Has This Been Tested? (Optional)

This is tested with QbeastInputSourcesTest. Added tests for error control when using:

  • LIMIT
  • FILTER BY Non-Deterministic Columns
  • SAMPLE
  • Use of Non-deterministic Expressions on projection (Rand, UUID...)

@osopardo1
Copy link
Member Author

This PR is blocked until we complete the #520 . Since the determinism of the DataFrame is part of the Revision Flow/DataFrame Analysis, we aim to have a proper logic component before introducing new changes.

val isCurrentOperationDeterministic = plan match {
case LocalLimit(_, _: Sort) => true // LocalLimit with Sort is deterministic
case LocalLimit(_, _) => false
case Sample(_, _, _, _, _) => false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this prevent a qbeast sample from being indexed with qbeast?

val qbeastDf = spark.read.format("qbeast").load(tablePath)
(qbeastDf
  .sample(0.1)
  .write
  .mode("append")
  .format("qbeast")
  .option("columnsToIndex", "col_1,col_2")
  .save(outputPath)
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, although I had my concern at first..

Currently, the sample with Qbeast is deterministic, because we use the hash of the columns to reproduce the same weight. But is that premise going to be true in the future? Or should the Sample with Qbeast be deterministic by all means?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you detect a qbeast sample?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that is possible for sure

@osopardo1 osopardo1 marked this pull request as ready for review December 20, 2024 08:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants