[SPARK-53332][SS] Enable StateDataSource with state checkpoint v2 (only snapshotStartBatchId option) #52202
+463
−139
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This is the last PR in the series (#52047, #52148, #52202) for adding State Data Source support with checkpoint v2.
The key enhancement is the ability to
snapshotStartBatchId
and related options when the state uses checkpoint v2.NOTE: To read checkpoint v2 state data sources it is required to have
"spark.sql.streaming.stateStore.checkpointFormatVersion" -> 2
. It is possible to allow reading state data sources arbitrarily based on what is in the CommitLog by relaxing assertion checks but this is left as a future change.Why are the changes needed?
State checkpoint v2 (
"spark.sql.streaming.stateStore.checkpointFormatVersion"
) introduces a new format for storing state metadata that includes unique identifiers in the file path for each state store. The existing StateDataSource implementation only worked with checkpoint v1 format, making it incompatible with streaming queries using the newer checkpoint format. OnlybatchId
was implemented in #52047 and onlyreadChangeFeed
was implemented in #52148.Does this PR introduce any user-facing change?
Yes.
State Data Source will work when checkpoint v2 is used and the
snapshotStartBatchId
and related options are used.How was this patch tested?
In the previous PRs test suites were added to parameterize the current tests with checkpoint v2. All of these tests are now added back. All tests that previously intentionally tested some feature of the State Data Source Reader with checkpoint v1 should now be parameterized with checkpoint v2 (including python tests).
RocksDBWithCheckpointV2StateDataSourceReaderSnapshotSuite
is added which uses the golden file approach similar to #46944 wheresnapshotStartBatchId
is first added.Was this patch authored or co-authored using generative AI tooling?
No