Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,50 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram
serde = None,
external = false,
constraints = Seq.empty)
val writeOptions = if (source == "delta") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Apache Spark source code have this kind delta-specific logic before, @juliuszsompolski ?

This looks like the first proposal to have a 3rd-party company data source in Apache Spark source code. At the first glance, this string match looks a little fragile to me.

// This ia a very special workaround for Delta Lake.
// Spark's SaveMode.Overwrite is documented as:
// * if data/table already exists, existing data is expected to be overwritten
// * by the contents of the DataFrame.
// It does not define the behaviour of overwriting the table metadata (schema, etc).
// Delta datasource interpretation of this API documentation of DataFrameWriter V1 is
// to not replace table schema, unless Delta-specific option "overwriteSchema" is set
// to true.
//
// However, DataFrameWriter V1 creates a ReplaceTableAsSelect plan, which is the same as
// the plan of DataFrameWriterV2 createOrReplace API, which is documented as:
// * The output table's schema, partition layout, properties, and other configuration
// * will be based on the contents of the data frame and the configuration set on this
// * writer. If the table exists, its configuration and data will be replaced.
// Therefore, for calls via DataFrameWriter V2 createOrReplace, the metadata always needs
// to be replaced, and Delta datasource doesn't use the overwriteSchema option.
//
// Since the created plan is exactly the same, Delta had used a very ugly hack to detect
// where the API call is coming from based on the stack trace of the call.
//
// In Spark 4.1 in connect mode, this stopped working because planning and execution of
// the commands go decoupled, and the stack trace no longer contains this point where the
// plan got created.
//
// To retain compatibility of the Delta datasource with Spark 4.1 in connect mode, Spark
// provides this explicit storage option to indicate to Delta datasource that this call
// is coming from DataFrameWriter V1.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per:

          // FIXME: Since the details of the documented semantics of Spark's DataFrameWriter V1
          //  saveAsTable API differs from that of CREATE/REPLACE TABLE AS SELECT, Spark should
          //  not be reusing the exact same logical plan for these APIs.
          //  Existing Datasources which have been implemented following Spark's documentation of
          //  these APIs should have a way to differentiate between these APIs.

Why don't we just always append the option? The downstream datasources who care about this behaviour will make the change accordingly.

// FIXME: Since the details of the documented semantics of Spark's DataFrameWriter V1
// saveAsTable API differs from that of CREATE/REPLACE TABLE AS SELECT, Spark should
// not be reusing the exact same logical plan for these APIs.
// Existing Datasources which have been implemented following Spark's documentation of
// these APIs should have a way to differentiate between these APIs.
extraOptions + ("isDataFrameWriterV1" -> "true")
} else {
extraOptions
}
ReplaceTableAsSelect(
UnresolvedIdentifier(nameParts),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
writeOptions = extraOptions.toMap,
writeOptions = writeOptions.toMap,
orCreate = true) // Create the table if it doesn't exist

case (other, _) =>
Expand Down