Skip to content

Commit c1bd94a

Browse files
committed
Fix the struct array issues
1 parent ed1fa57 commit c1bd94a

File tree

3 files changed

+207
-4
lines changed

3 files changed

+207
-4
lines changed

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,15 @@ object HoodieInternalRowUtils {
210210
case (newStructType: StructType, prevStructType: StructType) =>
211211
val writer = genUnsafeStructWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack)
212212

213-
val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
214-
val rowUpdater = new RowUpdater(newRow)
215-
216213
(fieldUpdater, ordinal, value) => {
217214
// Here new row is built in 2 stages:
218215
// - First, we pass mutable row (used as buffer/scratchpad) created above wrapped into [[RowUpdater]]
219216
// into generated row-writer
220217
// - Upon returning from row-writer, we call back into parent row's [[fieldUpdater]] to set returned
221218
// row as a value in it
219+
// NOTE: Create a new row for each element to avoid reusing the same row object across array elements
220+
val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
221+
val rowUpdater = new RowUpdater(newRow)
222222
writer(rowUpdater, value)
223223
fieldUpdater.set(ordinal, newRow)
224224
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hudi.functional
19+
20+
import org.apache.hudi.common.table.HoodieTableConfig
21+
import org.apache.hudi.config.HoodieWriteConfig
22+
import org.apache.hudi.testutils.HoodieSparkClientTestBase
23+
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, ScalaAssertionSupport}
24+
25+
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
26+
import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType}
27+
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
28+
import org.junit.jupiter.api.Assertions._
29+
30+
import scala.jdk.CollectionConverters._
31+
32+
/**
33+
* Test to verify schema evolution for array of structs with record mergers.
34+
* This test reproduces a bug where schema evolution of array<struct<...>> fields
35+
* can cause data corruption when using certain record mergers.
36+
*/
37+
class TestArrayStructSchemaEvolution extends HoodieSparkClientTestBase with ScalaAssertionSupport {
38+
var spark: SparkSession = _
39+
40+
@BeforeEach override def setUp(): Unit = {
41+
initPath()
42+
initSparkContexts()
43+
spark = sqlContext.sparkSession
44+
initHoodieStorage()
45+
}
46+
47+
@AfterEach override def tearDown(): Unit = {
48+
cleanupSparkContexts()
49+
cleanupFileSystem()
50+
}
51+
52+
@Test
53+
def testArrayStructSchemaEvolutionWithRecordMerger(): Unit = {
54+
val tablePath = basePath + "/array_struct_bug_100"
55+
val tableName = "array_struct_bug_100"
56+
57+
// ==========================================================
58+
// STEP 1: Initial schema (no evolution yet)
59+
// ==========================================================
60+
val schemaV1 = new StructType()
61+
.add("id", StringType, nullable = true)
62+
.add("items", ArrayType(new StructType()
63+
.add("a", IntegerType, nullable = true)
64+
.add("b", IntegerType, nullable = true)
65+
.add("c", IntegerType, nullable = true)
66+
.add("d", IntegerType, nullable = true)
67+
), nullable = true)
68+
val row1Items = Seq(
69+
Row(1, 11, 111, 1111),
70+
Row(2, 22, 222, 2222),
71+
Row(3, 33, 333, 3333)
72+
)
73+
val row2Items = Seq(
74+
Row(10, 77, 777, 7777),
75+
Row(20, 88, 888, 8888),
76+
Row(30, 99, 999, 9999)
77+
)
78+
val initialData = Seq(
79+
Row("1", row1Items),
80+
Row("2", row2Items)
81+
)
82+
val dfInit = spark.createDataFrame(spark.sparkContext.parallelize(initialData), schemaV1)
83+
84+
// ==========================================================
85+
// STEP 2: Write initial data using INSERT (not bulk-insert)
86+
// ==========================================================
87+
val hudiOpts = Map(
88+
HoodieTableConfig.NAME.key -> tableName,
89+
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
90+
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
91+
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
92+
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
93+
HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName
94+
)
95+
dfInit.write.format("hudi").options(hudiOpts).mode(SaveMode.Overwrite).save(tablePath)
96+
// Verify initial data
97+
val dfAfterInsert = spark.read.format("hudi").load(tablePath)
98+
assertEquals(2, dfAfterInsert.count(), "Should have 2 records after initial insert")
99+
100+
// ==========================================================
101+
// STEP 3: Schema evolution - Add a new field to the struct inside the array
102+
// ==========================================================
103+
val schemaV2 = new StructType()
104+
.add("id", StringType, nullable = true)
105+
.add("items", ArrayType(new StructType()
106+
.add("a", IntegerType, nullable = true)
107+
.add("b", IntegerType, nullable = true)
108+
.add("c", IntegerType, nullable = true)
109+
.add("d", IntegerType, nullable = true)
110+
.add("e", IntegerType, nullable = true) // <-- NEW FIELD
111+
), nullable = true)
112+
val row1ItemsEvolved = Seq(
113+
Row(1, 11, 111, 1111, 11111),
114+
Row(2, 22, 222, 2222, 22222),
115+
Row(3, 33, 333, 3333, 33333)
116+
)
117+
val dfEvolved = spark.createDataFrame(
118+
spark.sparkContext.parallelize(Seq(Row("1", row1ItemsEvolved))),
119+
schemaV2
120+
)
121+
122+
// ==========================================================
123+
// STEP 4: Upsert with HoodieSparkRecordMerger
124+
// ==========================================================
125+
val hudiOptsUpsert = Map(
126+
HoodieTableConfig.NAME.key -> tableName,
127+
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
128+
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
129+
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
130+
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
131+
HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> "org.apache.hudi.HoodieSparkRecordMerger"
132+
)
133+
dfEvolved.write.format("hudi").options(hudiOptsUpsert).mode(SaveMode.Append).save(tablePath)
134+
135+
// ==========================================================
136+
// STEP 5: Load after upsert and verify data integrity
137+
// ==========================================================
138+
val dfFinal = spark.read.format("hudi").load(tablePath)
139+
// Verify we still have 2 records
140+
assertEquals(2, dfFinal.count(), "Should still have 2 records after upsert")
141+
// Verify the schema includes the new field 'e'
142+
val finalSchema = dfFinal.schema
143+
val itemsField = finalSchema.fields.find(_.name == "items").get
144+
assertTrue(itemsField.dataType.isInstanceOf[ArrayType], "items should be an ArrayType")
145+
val arrayType = itemsField.dataType.asInstanceOf[ArrayType]
146+
assertTrue(arrayType.elementType.isInstanceOf[StructType], "items array element should be StructType")
147+
val structType = arrayType.elementType.asInstanceOf[StructType]
148+
val fieldNames = structType.fields.map(_.name)
149+
assertTrue(fieldNames.contains("e"), "Schema should include the evolved field 'e'")
150+
assertEquals(5, fieldNames.length, "Struct should have 5 fields (a, b, c, d, e)")
151+
// Verify we can read all data without errors (this would fail if data is corrupted)
152+
dfFinal.foreach(_ => {})
153+
154+
// Verify data for id="1" (updated record)
155+
val record1 = dfFinal.filter("id = '1'").collect()
156+
assertEquals(1, record1.length, "Should have exactly one record with id='1'")
157+
val items1 = record1(0).getAs[java.util.List[Row]]("items").asScala.toSeq
158+
assertNotNull(items1, "items should not be null for id='1'")
159+
assertEquals(3, items1.length, "id='1' should have 3 items")
160+
// Verify first item of id="1" has all fields including 'e'
161+
val firstItem1 = items1.head
162+
assertEquals(1, firstItem1.getInt(0), "First item 'a' should be 1")
163+
assertEquals(11, firstItem1.getInt(1), "First item 'b' should be 11")
164+
assertEquals(111, firstItem1.getInt(2), "First item 'c' should be 111")
165+
assertEquals(1111, firstItem1.getInt(3), "First item 'd' should be 1111")
166+
assertEquals(11111, firstItem1.getInt(4), "First item 'e' should be 11111")
167+
168+
// Verify second item of id="1"
169+
val secondItem1 = items1(1)
170+
assertEquals(2, secondItem1.getInt(0), "Second item 'a' should be 2")
171+
assertEquals(22, secondItem1.getInt(1), "Second item 'b' should be 22")
172+
assertEquals(222, secondItem1.getInt(2), "Second item 'c' should be 222")
173+
assertEquals(2222, secondItem1.getInt(3), "Second item 'd' should be 2222")
174+
assertEquals(22222, secondItem1.getInt(4), "Second item 'e' should be 22222")
175+
// Verify data for id="2" (unchanged record - should have null for 'e')
176+
val record2 = dfFinal.filter("id = '2'").collect()
177+
assertEquals(1, record2.length, "Should have exactly one record with id='2'")
178+
val items2 = record2(0).getAs[scala.collection.Seq[Row]]("items").toSeq
179+
assertNotNull(items2, "items should not be null for id='2'")
180+
assertEquals(3, items2.length, "id='2' should have 3 items")
181+
// Verify first item of id="2" - should have original values and null for 'e'
182+
val firstItem2 = items2(0)
183+
assertEquals(10, firstItem2.getInt(0), "First item 'a' should be 10")
184+
assertEquals(77, firstItem2.getInt(1), "First item 'b' should be 77")
185+
assertEquals(777, firstItem2.getInt(2), "First item 'c' should be 777")
186+
assertEquals(7777, firstItem2.getInt(3), "First item 'd' should be 7777")
187+
assertTrue(firstItem2.isNullAt(4), "First item 'e' should be null for unchanged record")
188+
// Verify second item of id="2"
189+
val secondItem2 = items2(1)
190+
assertEquals(20, secondItem2.getInt(0), "Second item 'a' should be 20")
191+
assertEquals(88, secondItem2.getInt(1), "Second item 'b' should be 88")
192+
assertEquals(888, secondItem2.getInt(2), "Second item 'c' should be 888")
193+
assertEquals(8888, secondItem2.getInt(3), "Second item 'd' should be 8888")
194+
assertTrue(secondItem2.isNullAt(4), "Second item 'e' should be null for unchanged record")
195+
}
196+
}
197+

hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,13 @@ case class HoodieSpark32PlusResolveReferences(spark: SparkSession) extends Rule[
111111
lazy val analyzer = spark.sessionState.analyzer
112112
val targetTable = if (targetTableO.resolved) targetTableO else analyzer.execute(targetTableO)
113113
val sourceTable = if (sourceTableO.resolved) sourceTableO else analyzer.execute(sourceTableO)
114-
val m = mO.asInstanceOf[MergeIntoTable].copy(targetTable = targetTable, sourceTable = sourceTable)
114+
val originalMergeInto = mO.asInstanceOf[MergeIntoTable]
115+
val m = originalMergeInto.copy(
116+
targetTable = targetTable,
117+
sourceTable = sourceTable,
118+
mergeCondition = originalMergeInto.mergeCondition,
119+
matchedActions = originalMergeInto.matchedActions,
120+
notMatchedActions = originalMergeInto.notMatchedActions)
115121
// END: custom Hudi change
116122
EliminateSubqueryAliases(targetTable) match {
117123
case r: NamedRelation if r.skipSchemaResolution =>

0 commit comments

Comments
 (0)