Skip to content

Commit e13fb11

Browse files
sryzagengliangwang
authored andcommitted
[SPARK-54803] Support BY NAME with INSERT INTO ... REPLACE WHERE
### What changes were proposed in this pull request? Adds support for `BY NAME` to `INSERT INTO ... REPLACE WHERE` statements. ### Why are the changes needed? Other `INSERT INTO` statements support a `BY NAME` parameter. ### Does this PR introduce _any_ user-facing change? - Allows providing `BY NAME` in an `INSERT INTO ... REPLACE WHERE` statement. - If `BY NAME` is provided, then the `INSERT` will be by name instead of by position. ### How was this patch tested? Added unit tests. ### Was this patch authored or co-authored using generative AI tooling? Closes apache#53567 from sryza/replace-where-by-name. Lead-authored-by: Sandy Ryza <[email protected]> Co-authored-by: Sandy Ryza <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent c76eeeb commit e13fb11

File tree

4 files changed

+137
-6
lines changed

4 files changed

+137
-6
lines changed

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ query
562562
insertInto
563563
: INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
564564
| INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
565-
| INSERT INTO TABLE? identifierReference optionsClause? REPLACE whereClause #insertIntoReplaceWhere
565+
| INSERT INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE whereClause #insertIntoReplaceWhere
566566
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
567567
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
568568
;

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -928,11 +928,15 @@ class AstBuilder extends DataTypeAstBuilder
928928
case ctx: InsertIntoReplaceWhereContext =>
929929
val options = Option(ctx.optionsClause())
930930
withIdentClause(ctx.identifierReference, Seq(query), (ident, otherPlans) => {
931-
OverwriteByExpression.byPosition(
932-
createUnresolvedRelation(ctx.identifierReference, ident, options,
933-
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false),
934-
otherPlans.head,
935-
expression(ctx.whereClause().booleanExpression()))
931+
val table = createUnresolvedRelation(ctx.identifierReference, ident, options,
932+
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false)
933+
val deleteExpr = expression(ctx.whereClause().booleanExpression())
934+
val isByName = ctx.NAME() != null
935+
if (isByName) {
936+
OverwriteByExpression.byName(table, otherPlans.head, deleteExpr)
937+
} else {
938+
OverwriteByExpression.byPosition(table, otherPlans.head, deleteExpr)
939+
}
936940
})
937941
case dir: InsertOverwriteDirContext =>
938942
val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ class DDLParserSuite extends AnalysisTest {
4949
case u: UnresolvedRelation => i.copy(table = u.clearWritePrivileges)
5050
case _ => i
5151
}
52+
case o: OverwriteByExpression =>
53+
o.table match {
54+
case u: UnresolvedRelation => o.copy(table = u.clearWritePrivileges)
55+
case _ => o
56+
}
5257
}
5358
comparePlans(parsed, expected, checkAnalysis = false)
5459
}
@@ -1763,6 +1768,28 @@ class DDLParserSuite extends AnalysisTest {
17631768
)
17641769
}
17651770

1771+
test("insert table: REPLACE WHERE with BY NAME") {
1772+
parseCompare(
1773+
"INSERT INTO testcat.ns1.ns2.tbl BY NAME REPLACE WHERE a > 5 SELECT * FROM source",
1774+
OverwriteByExpression.byName(
1775+
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
1776+
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
1777+
org.apache.spark.sql.catalyst.expressions.GreaterThan(
1778+
UnresolvedAttribute("a"),
1779+
Literal(5))))
1780+
}
1781+
1782+
test("insert table: REPLACE WHERE without BY NAME") {
1783+
parseCompare(
1784+
"INSERT INTO testcat.ns1.ns2.tbl REPLACE WHERE a > 5 SELECT * FROM source",
1785+
OverwriteByExpression.byPosition(
1786+
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
1787+
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
1788+
org.apache.spark.sql.catalyst.expressions.GreaterThan(
1789+
UnresolvedAttribute("a"),
1790+
Literal(5))))
1791+
}
1792+
17661793
test("delete from table: delete all") {
17671794
parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
17681795
DeleteFromTable(

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3534,6 +3534,106 @@ class DataSourceV2SQLSuiteV1Filter
35343534
}
35353535
}
35363536

3537+
test("Selective Overwrite: REPLACE WHERE with BY NAME - column reordering") {
3538+
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
3539+
df.createOrReplaceTempView("source")
3540+
val df2 = spark.createDataFrame(Seq(("d", 4L), ("e", 5L), ("f", 6L))).toDF("data", "id")
3541+
df2.createOrReplaceTempView("source2_reordered")
3542+
3543+
val t = "testcat.tbl"
3544+
withTable(t) {
3545+
spark.sql(
3546+
s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
3547+
spark.sql(s"INSERT INTO TABLE $t SELECT * FROM source")
3548+
3549+
checkAnswer(
3550+
spark.table(s"$t"),
3551+
Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c")))
3552+
3553+
spark.sql(s"INSERT INTO $t BY NAME REPLACE WHERE id = 3 SELECT * FROM source2_reordered")
3554+
checkAnswer(
3555+
spark.table(s"$t"),
3556+
Seq(Row(1L, "a"), Row(2L, "b"), Row(4L, "d"), Row(5L, "e"), Row(6L, "f")))
3557+
}
3558+
}
3559+
3560+
test("Overwrite: REPLACE WHERE without BY NAME - positional matching") {
3561+
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
3562+
df.createOrReplaceTempView("source")
3563+
val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("data", "id")
3564+
df2.createOrReplaceTempView("source2_names_reordered")
3565+
3566+
val t = "testcat.tbl"
3567+
withTable(t) {
3568+
spark.sql(
3569+
s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
3570+
spark.sql(s"INSERT INTO TABLE $t SELECT * FROM source")
3571+
3572+
checkAnswer(
3573+
spark.table(s"$t"),
3574+
Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c")))
3575+
3576+
spark.sql(s"INSERT INTO $t REPLACE WHERE id = 3 SELECT * FROM source2_names_reordered")
3577+
checkAnswer(
3578+
spark.table(s"$t"),
3579+
Seq(Row(1L, "a"), Row(2L, "b"), Row(4L, "d"), Row(5L, "e"), Row(6L, "f")))
3580+
}
3581+
}
3582+
3583+
test("Overwrite: REPLACE WHERE without BY NAME - different column order between 2 tables, " +
3584+
"compatible types") {
3585+
val df = spark.createDataFrame(Seq((1L, 11L), (2L, 12L), (3L, 13L))).toDF("id", "data")
3586+
df.createOrReplaceTempView("source")
3587+
val df2 = spark.createDataFrame(Seq((14L, 4L), (15L, 5L), (16L, 6L))).toDF("data", "id")
3588+
df2.createOrReplaceTempView("source2_reordered")
3589+
3590+
val t = "testcat.tbl"
3591+
withTable(t) {
3592+
spark.sql(
3593+
s"CREATE TABLE $t (id bigint, data bigint) USING foo PARTITIONED BY (id)")
3594+
spark.sql(s"INSERT INTO TABLE $t SELECT * FROM source")
3595+
3596+
checkAnswer(
3597+
spark.table(s"$t"),
3598+
Seq(Row(1L, 11L), Row(2L, 12L), Row(3L, 13L)))
3599+
3600+
spark.sql(s"INSERT INTO $t REPLACE WHERE id = 3 SELECT * FROM source2_reordered")
3601+
checkAnswer(
3602+
spark.table(s"$t"),
3603+
Seq(Row(1L, 11L), Row(2L, 12L), Row(14L, 4L), Row(15L, 5L), Row(16L, 6L)))
3604+
}
3605+
}
3606+
3607+
test("Overwrite: REPLACE WHERE without BY NAME - incompatible types") {
3608+
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
3609+
df.createOrReplaceTempView("source")
3610+
val df2 = spark.createDataFrame(Seq(("d", 4L), ("e", 5L), ("f", 6L))).toDF("data", "id")
3611+
df2.createOrReplaceTempView("source2_reordered")
3612+
3613+
val t = "testcat.tbl"
3614+
withTable(t) {
3615+
spark.sql(
3616+
s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
3617+
spark.sql(s"INSERT INTO TABLE $t SELECT * FROM source")
3618+
3619+
checkAnswer(
3620+
spark.table(s"$t"),
3621+
Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c")))
3622+
3623+
checkError(
3624+
exception = intercept[AnalysisException] {
3625+
spark.sql(s"INSERT INTO $t REPLACE WHERE id = 3 SELECT * FROM source2_reordered")
3626+
},
3627+
condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
3628+
parameters = Map(
3629+
"tableName" -> "`testcat`.`tbl`",
3630+
"colName" -> "`id`",
3631+
"srcType" -> "\"STRING\"",
3632+
"targetType" -> "\"BIGINT\"")
3633+
)
3634+
}
3635+
}
3636+
35373637
test("SPARK-46144: Fail overwrite statement if the condition contains subquery") {
35383638
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
35393639
df.createOrReplaceTempView("source")

0 commit comments

Comments
 (0)