-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54462][SQL] Add isDataFrameWriterV1 option for Delta datasource compatibility
#53173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
juliuszsompolski
wants to merge
1
commit into
apache:master
Choose a base branch
from
juliuszsompolski:isdataframewriterv1
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+39
−1
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -484,12 +484,50 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram | |
| serde = None, | ||
| external = false, | ||
| constraints = Seq.empty) | ||
| val writeOptions = if (source == "delta") { | ||
| // This ia a very special workaround for Delta Lake. | ||
| // Spark's SaveMode.Overwrite is documented as: | ||
| // * if data/table already exists, existing data is expected to be overwritten | ||
| // * by the contents of the DataFrame. | ||
| // It does not define the behaviour of overwriting the table metadata (schema, etc). | ||
| // Delta datasource interpretation of this API documentation of DataFrameWriter V1 is | ||
| // to not replace table schema, unless Delta-specific option "overwriteSchema" is set | ||
| // to true. | ||
| // | ||
| // However, DataFrameWriter V1 creates a ReplaceTableAsSelect plan, which is the same as | ||
| // the plan of DataFrameWriterV2 createOrReplace API, which is documented as: | ||
| // * The output table's schema, partition layout, properties, and other configuration | ||
| // * will be based on the contents of the data frame and the configuration set on this | ||
| // * writer. If the table exists, its configuration and data will be replaced. | ||
| // Therefore, for calls via DataFrameWriter V2 createOrReplace, the metadata always needs | ||
| // to be replaced, and Delta datasource doesn't use the overwriteSchema option. | ||
| // | ||
| // Since the created plan is exactly the same, Delta had used a very ugly hack to detect | ||
| // where the API call is coming from based on the stack trace of the call. | ||
| // | ||
| // In Spark 4.1 in connect mode, this stopped working because planning and execution of | ||
| // the commands go decoupled, and the stack trace no longer contains this point where the | ||
| // plan got created. | ||
| // | ||
| // To retain compatibility of the Delta datasource with Spark 4.1 in connect mode, Spark | ||
| // provides this explicit storage option to indicate to Delta datasource that this call | ||
| // is coming from DataFrameWriter V1. | ||
| // | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per: Why don't we just always append the option? The downstream datasources who care about this behaviour will make the change accordingly. |
||
| // FIXME: Since the details of the documented semantics of Spark's DataFrameWriter V1 | ||
| // saveAsTable API differs from that of CREATE/REPLACE TABLE AS SELECT, Spark should | ||
| // not be reusing the exact same logical plan for these APIs. | ||
| // Existing Datasources which have been implemented following Spark's documentation of | ||
| // these APIs should have a way to differentiate between these APIs. | ||
| extraOptions + ("isDataFrameWriterV1" -> "true") | ||
| } else { | ||
| extraOptions | ||
| } | ||
| ReplaceTableAsSelect( | ||
| UnresolvedIdentifier(nameParts), | ||
| partitioningAsV2, | ||
| df.queryExecution.analyzed, | ||
| tableSpec, | ||
| writeOptions = extraOptions.toMap, | ||
| writeOptions = writeOptions.toMap, | ||
| orCreate = true) // Create the table if it doesn't exist | ||
|
|
||
| case (other, _) => | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Apache Spark source code have this kind
delta-specific logic before, @juliuszsompolski ?This looks like the first proposal to have a 3rd-party company data source in Apache Spark source code. At the first glance, this string match looks a little fragile to me.