Skip to content

Commit 5216d45

Browse files
linliu-codeyihua
andcommitted
[HUDI-9132] Avoid empty string row key for delete and update operations
Co-authored-by: Y Ethan Guo <[email protected]>
1 parent c7c3532 commit 5216d45

File tree

3 files changed

+181
-22
lines changed

3 files changed

+181
-22
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.datasources.FileStatusCache
4444
import org.apache.spark.sql.hive.HiveExternalCatalog
4545
import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
4646
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{filterHoodieConfigs, isUsingHiveCatalog}
47-
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{buildOverridingOpts, buildOverridingOptsForDelete, combineOptions, getPartitionPathFieldWriteConfig}
47+
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{buildCommonOverridingOpts, buildOverridingOptsForDelete, combineOptions, getPartitionPathFieldWriteConfig}
4848
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
4949
import org.apache.spark.sql.internal.SQLConf
5050
import org.apache.spark.sql.internal.SQLConf.PARTITION_OVERWRITE_MODE
@@ -82,7 +82,11 @@ trait ProvidesHoodieConfig extends Logging {
8282
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString
8383
)
8484

85-
val overridingOpts = buildOverridingOpts(hoodieCatalogTable, preCombineField)
85+
val partitionPathField = getPartitionPathFieldWriteConfig(
86+
tableConfig.getKeyGeneratorClassName,
87+
tableConfig.getPartitionFieldProp,
88+
hoodieCatalogTable)
89+
val overridingOpts = buildCommonOverridingOpts(hoodieCatalogTable, partitionPathField)
8690
combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
8791
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
8892
}
@@ -103,7 +107,7 @@ trait ProvidesHoodieConfig extends Logging {
103107
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> "false"
104108
)
105109

106-
val overridingOpts = buildOverridingOpts(hoodieCatalogTable, preCombineField)
110+
val overridingOpts = buildCommonOverridingOpts(hoodieCatalogTable, preCombineField)
107111
combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
108112
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
109113
}
@@ -435,7 +439,11 @@ trait ProvidesHoodieConfig extends Logging {
435439
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
436440
)
437441

438-
val overridingOpts = buildOverridingOptsForDelete(hoodieCatalogTable)
442+
val partitionPathField = getPartitionPathFieldWriteConfig(
443+
tableConfig.getKeyGeneratorClassName,
444+
tableConfig.getPartitionFieldProp,
445+
hoodieCatalogTable)
446+
val overridingOpts = buildOverridingOptsForDelete(hoodieCatalogTable, partitionPathField)
439447
combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
440448
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
441449
}
@@ -556,36 +564,35 @@ object ProvidesHoodieConfig {
556564
private def filterNullValues(opts: Map[String, String]): Map[String, String] =
557565
opts.filter { case (_, v) => v != null }
558566

559-
private def buildOverridingOpts(hoodieCatalogTable: HoodieCatalogTable,
560-
preCombineField: String): Map[String, String] = {
561-
buildCommonOverridingOpts(hoodieCatalogTable) ++ Map(
562-
PRECOMBINE_FIELD.key -> preCombineField
563-
)
564-
}
565567

566-
private def buildOverridingOptsForDelete(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
567-
buildCommonOverridingOpts(hoodieCatalogTable) ++ Map(
568+
def buildOverridingOptsForDelete(hoodieCatalogTable: HoodieCatalogTable,
569+
partitionPathField: String): Map[String, String] = {
570+
buildCommonOverridingOpts(hoodieCatalogTable, partitionPathField) ++ Map(
568571
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL
569572
)
570573
}
571574

572-
private def buildCommonOverridingOpts(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
575+
def buildCommonOverridingOpts(hoodieCatalogTable: HoodieCatalogTable,
576+
partitionPathField: String): Map[String, String] = {
573577
val tableConfig = hoodieCatalogTable.tableConfig
574-
val baseOpts = Map(
578+
var baseOpts = Map(
575579
"path" -> hoodieCatalogTable.tableLocation,
576580
TBL_NAME.key -> tableConfig.getTableName,
577581
DATABASE_NAME.key -> hoodieCatalogTable.table.database,
578582
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
579583
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
580-
PARTITIONPATH_FIELD.key -> getPartitionPathFieldWriteConfig(
581-
tableConfig.getKeyGeneratorClassName,
582-
tableConfig.getPartitionFieldProp,
583-
hoodieCatalogTable)
584+
PARTITIONPATH_FIELD.key -> partitionPathField
584585
)
585-
if (hoodieCatalogTable.primaryKeys.isEmpty) {
586-
baseOpts
587-
} else {
588-
baseOpts + (RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","))
586+
// Add preCombineField if possible.
587+
val preCombineField = tableConfig.getPreCombineField
588+
if (!StringUtils.isNullOrEmpty(preCombineField)) {
589+
baseOpts = baseOpts + (PRECOMBINE_FIELD.key -> preCombineField)
590+
}
591+
// Add recordkey field if possible.
592+
if (!hoodieCatalogTable.primaryKeys.isEmpty) {
593+
baseOpts = baseOpts + (RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","))
589594
}
595+
// Return.
596+
baseOpts
590597
}
591598
}

hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
package org.apache.spark.sql.hudi
2121

2222
import org.apache.hudi.DataSourceWriteOptions
23+
import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, OPERATION, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, URL_ENCODE_PARTITIONING}
2324
import org.apache.hudi.common.config.TypedProperties
2425
import org.apache.hudi.common.table.HoodieTableConfig
26+
import org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME
27+
import org.apache.hudi.config.HoodieWriteConfig
2528
import org.apache.hudi.hive.HiveSyncConfig
2629
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator}
2730

@@ -32,6 +35,9 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf, StaticSQLConf}
3235
import org.apache.spark.sql.types.StructType
3336
import org.junit.jupiter.api.Assertions.assertEquals
3437
import org.junit.jupiter.api.Test
38+
import org.junit.jupiter.params.ParameterizedTest
39+
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
40+
import org.junit.jupiter.params.provider.Arguments.arguments
3541
import org.mockito.ArgumentMatchers.any
3642
import org.mockito.Mockito.{mock, spy, when}
3743

@@ -150,6 +156,60 @@ class TestProvidesHoodieConfig {
150156
)
151157
}
152158

159+
@ParameterizedTest
160+
@MethodSource(Array("testBuildCommonOverridingOptsParams"))
161+
def testBuildCommonOverridingOpts(primaryKeys: Array[String],
162+
preCombineField: String,
163+
expectedOpts: Map[String, String]): Unit = {
164+
val mockTableConfig = mock(classOf[HoodieTableConfig])
165+
when(mockTableConfig.getTableName).thenReturn("myTable")
166+
when(mockTableConfig.getHiveStylePartitioningEnable).thenReturn("true")
167+
when(mockTableConfig.getUrlEncodePartitioning).thenReturn("false")
168+
when(mockTableConfig.getPreCombineField).thenReturn(preCombineField)
169+
170+
val mockHoodieTable = mock(classOf[HoodieCatalogTable])
171+
when(mockHoodieTable.tableConfig).thenReturn(mockTableConfig)
172+
when(mockHoodieTable.tableLocation).thenReturn("/dummy/path")
173+
when(mockHoodieTable.primaryKeys).thenReturn(primaryKeys)
174+
175+
val partitionPathField = "partField"
176+
val result = ProvidesHoodieConfig.buildCommonOverridingOpts(mockHoodieTable, partitionPathField)
177+
assert(result == expectedOpts)
178+
}
179+
180+
@Test
181+
def testBuildOverridingOptsForDelete(): Unit = {
182+
val mockTableConfig = mock(classOf[HoodieTableConfig])
183+
when(mockTableConfig.getTableName).thenReturn("myTable")
184+
when(mockTableConfig.getHiveStylePartitioningEnable).thenReturn("true")
185+
when(mockTableConfig.getUrlEncodePartitioning).thenReturn("false")
186+
187+
val mockHoodieTable = mock(classOf[HoodieCatalogTable])
188+
val mockCatalogTable = mock(classOf[CatalogTable])
189+
when(mockHoodieTable.table).thenReturn(mockCatalogTable)
190+
when(mockCatalogTable.database).thenReturn("myDatabase")
191+
when(mockHoodieTable.tableConfig).thenReturn(mockTableConfig)
192+
when(mockHoodieTable.tableLocation).thenReturn("/dummy/path")
193+
when(mockHoodieTable.primaryKeys).thenReturn(Array("id1", "id2"))
194+
195+
val partitionPathField = "partField"
196+
197+
val expectedOpts = Map(
198+
"path" -> "/dummy/path",
199+
HoodieWriteConfig.TBL_NAME.key -> "myTable",
200+
DATABASE_NAME.key -> "myDatabase",
201+
HIVE_STYLE_PARTITIONING.key -> "true",
202+
URL_ENCODE_PARTITIONING.key -> "false",
203+
PARTITIONPATH_FIELD.key -> partitionPathField,
204+
RECORDKEY_FIELD.key -> "id1,id2",
205+
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL
206+
)
207+
208+
val result = ProvidesHoodieConfig.buildOverridingOptsForDelete(
209+
mockHoodieTable, partitionPathField)
210+
assert(result == expectedOpts)
211+
}
212+
153213
private def mockPartitionWriteConfigInCatalogProps(mockTable: HoodieCatalogTable,
154214
value: Option[String]): Unit = {
155215
val props = if (value.isDefined) {
@@ -160,3 +220,45 @@ class TestProvidesHoodieConfig {
160220
when(mockTable.catalogProperties).thenReturn(props)
161221
}
162222
}
223+
224+
object TestProvidesHoodieConfig {
225+
def testBuildCommonOverridingOptsParams(): java.util.stream.Stream[Arguments] = {
226+
java.util.stream.Stream.of(
227+
arguments(
228+
Array.empty[String],
229+
"",
230+
Map(
231+
"path" -> "/dummy/path",
232+
HoodieWriteConfig.TBL_NAME.key -> "myTable",
233+
DATABASE_NAME.key -> "myDatabase",
234+
HIVE_STYLE_PARTITIONING.key -> "true",
235+
URL_ENCODE_PARTITIONING.key -> "false",
236+
PARTITIONPATH_FIELD.key -> "partField"
237+
)),
238+
arguments(
239+
Array("id1", "id2"),
240+
"",
241+
Map(
242+
"path" -> "/dummy/path",
243+
HoodieWriteConfig.TBL_NAME.key -> "myTable",
244+
DATABASE_NAME.key -> "myDatabase",
245+
HIVE_STYLE_PARTITIONING.key -> "true",
246+
URL_ENCODE_PARTITIONING.key -> "false",
247+
PARTITIONPATH_FIELD.key -> "partField",
248+
RECORDKEY_FIELD.key -> "id1,id2"
249+
)),
250+
arguments(
251+
Array("id1", "id2"),
252+
"ts",
253+
Map(
254+
"path" -> "/dummy/path",
255+
HoodieWriteConfig.TBL_NAME.key -> "myTable",
256+
DATABASE_NAME.key -> "myDatabase",
257+
HIVE_STYLE_PARTITIONING.key -> "true",
258+
URL_ENCODE_PARTITIONING.key -> "false",
259+
PARTITIONPATH_FIELD.key -> "partField",
260+
RECORDKEY_FIELD.key -> "id1,id2",
261+
PRECOMBINE_FIELD.key -> "ts"
262+
)))
263+
}
264+
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,56 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
184184
}
185185
}
186186

187+
test("Test Delete From Partitioned Table Without Primary Key") {
188+
withTempDir { tmp =>
189+
Seq("cow", "mor").foreach { tableType =>
190+
val tableName = generateTableName
191+
// create table
192+
spark.sql(
193+
s"""
194+
|create table $tableName (
195+
| id int,
196+
| name string,
197+
| price double,
198+
| ts long
199+
|) using hudi
200+
| location '${tmp.getCanonicalPath}/$tableName'
201+
| tblproperties (
202+
| type = '$tableType',
203+
| preCombineField = 'price'
204+
|)
205+
|partitioned by (ts)
206+
""".stripMargin)
207+
208+
// test with optimized sql writes enabled.
209+
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
210+
211+
// insert data to table
212+
spark.sql(
213+
s"""
214+
|insert into $tableName
215+
|values
216+
| (1, 'a1', cast(10.0 as double), 1000),
217+
| (2, 'a2', cast(20.0 as double), 1000),
218+
| (3, 'a2', cast(30.0 as double), 1000)
219+
|""".stripMargin)
220+
checkAnswer(s"select id, name, price, ts from $tableName")(
221+
Seq(1, "a1", 10.0, 1000),
222+
Seq(2, "a2", 20.0, 1000),
223+
Seq(3, "a2", 30.0, 1000)
224+
)
225+
226+
// delete data from table
227+
spark.sql(s"delete from $tableName where id = 1")
228+
229+
checkAnswer(s"select id, name, price, ts from $tableName")(
230+
Seq(2, "a2", 20.0, 1000),
231+
Seq(3, "a2", 30.0, 1000)
232+
)
233+
}
234+
}
235+
}
236+
187237
test("Test Delete Table On Non-PK Condition") {
188238
withTempDir { tmp =>
189239
Seq("cow", "mor").foreach {tableType =>

0 commit comments

Comments
 (0)