Skip to content

Commit 1e6fc7d

Browse files
committed
Fix the struct array issue
1 parent ed1fa57 commit 1e6fc7d

File tree

2 files changed

+197
-3
lines changed

2 files changed

+197
-3
lines changed

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,15 @@ object HoodieInternalRowUtils {
210210
case (newStructType: StructType, prevStructType: StructType) =>
211211
val writer = genUnsafeStructWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack)
212212

213-
val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
214-
val rowUpdater = new RowUpdater(newRow)
215-
216213
(fieldUpdater, ordinal, value) => {
217214
// Here new row is built in 2 stages:
218215
// - First, we pass mutable row (used as buffer/scratchpad) created above wrapped into [[RowUpdater]]
219216
// into generated row-writer
220217
// - Upon returning from row-writer, we call back into parent row's [[fieldUpdater]] to set returned
221218
// row as a value in it
219+
// NOTE: Create a new row for each element to avoid reusing the same row object across array elements
220+
val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
221+
val rowUpdater = new RowUpdater(newRow)
222222
writer(rowUpdater, value)
223223
fieldUpdater.set(ordinal, newRow)
224224
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hudi.functional
19+
20+
import org.apache.hudi.common.table.HoodieTableConfig
21+
import org.apache.hudi.config.HoodieWriteConfig
22+
import org.apache.hudi.testutils.HoodieSparkClientTestBase
23+
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, ScalaAssertionSupport}
24+
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
25+
import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType}
26+
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
27+
import org.junit.jupiter.api.Assertions._
28+
29+
/**
30+
* Test to verify schema evolution for array of structs with record mergers.
31+
* This test reproduces a bug where schema evolution of array<struct<...>> fields
32+
* can cause data corruption when using certain record mergers.
33+
*/
34+
class TestArrayStructSchemaEvolution extends HoodieSparkClientTestBase with ScalaAssertionSupport {
35+
var spark: SparkSession = _
36+
37+
@BeforeEach override def setUp(): Unit = {
38+
initPath()
39+
initSparkContexts()
40+
spark = sqlContext.sparkSession
41+
initHoodieStorage()
42+
}
43+
44+
@AfterEach override def tearDown(): Unit = {
45+
cleanupSparkContexts()
46+
cleanupFileSystem()
47+
}
48+
49+
@Test
50+
def testArrayStructSchemaEvolutionWithRecordMerger(): Unit = {
51+
val tablePath = basePath + "/array_struct_bug_100"
52+
val tableName = "array_struct_bug_100"
53+
54+
// ==========================================================
55+
// STEP 1: Initial schema (no evolution yet)
56+
// ==========================================================
57+
val schemaV1 = new StructType()
58+
.add("id", StringType, nullable = true)
59+
.add("items", ArrayType(new StructType()
60+
.add("a", IntegerType, nullable = true)
61+
.add("b", IntegerType, nullable = true)
62+
.add("c", IntegerType, nullable = true)
63+
.add("d", IntegerType, nullable = true)
64+
), nullable = true)
65+
val row1Items = Seq(
66+
Row(1, 11, 111, 1111),
67+
Row(2, 22, 222, 2222),
68+
Row(3, 33, 333, 3333)
69+
)
70+
val row2Items = Seq(
71+
Row(10, 77, 777, 7777),
72+
Row(20, 88, 888, 8888),
73+
Row(30, 99, 999, 9999)
74+
)
75+
val initialData = Seq(
76+
Row("1", row1Items),
77+
Row("2", row2Items)
78+
)
79+
val dfInit = spark.createDataFrame(spark.sparkContext.parallelize(initialData), schemaV1)
80+
81+
// ==========================================================
82+
// STEP 2: Write initial data using INSERT (not bulk-insert)
83+
// ==========================================================
84+
val hudiOpts = Map(
85+
HoodieTableConfig.NAME.key -> tableName,
86+
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
87+
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
88+
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
89+
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
90+
HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName
91+
)
92+
dfInit.write.format("hudi").options(hudiOpts).mode(SaveMode.Overwrite).save(tablePath)
93+
// Verify initial data
94+
val dfAfterInsert = spark.read.format("hudi").load(tablePath)
95+
assertEquals(2, dfAfterInsert.count(), "Should have 2 records after initial insert")
96+
97+
// ==========================================================
98+
// STEP 3: Schema evolution - Add a new field to the struct inside the array
99+
// ==========================================================
100+
val schemaV2 = new StructType()
101+
.add("id", StringType, nullable = true)
102+
.add("items", ArrayType(new StructType()
103+
.add("a", IntegerType, nullable = true)
104+
.add("b", IntegerType, nullable = true)
105+
.add("c", IntegerType, nullable = true)
106+
.add("d", IntegerType, nullable = true)
107+
.add("e", IntegerType, nullable = true) // <-- NEW FIELD
108+
), nullable = true)
109+
val row1ItemsEvolved = Seq(
110+
Row(1, 11, 111, 1111, 11111),
111+
Row(2, 22, 222, 2222, 22222),
112+
Row(3, 33, 333, 3333, 33333)
113+
)
114+
val dfEvolved = spark.createDataFrame(
115+
spark.sparkContext.parallelize(Seq(Row("1", row1ItemsEvolved))),
116+
schemaV2
117+
)
118+
119+
// ==========================================================
120+
// STEP 4: Upsert with HoodieSparkRecordMerger
121+
// ==========================================================
122+
val hudiOptsUpsert = Map(
123+
DataSourceWriteOptions.TABLE_NAME.key -> tableName,
124+
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
125+
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
126+
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
127+
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
128+
HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> "org.apache.hudi.HoodieSparkRecordMerger"
129+
)
130+
dfEvolved.write.format("hudi").options(hudiOptsUpsert).mode(SaveMode.Append).save(tablePath)
131+
132+
// ==========================================================
133+
// STEP 5: Load after upsert and verify data integrity
134+
// ==========================================================
135+
val dfFinal = spark.read.format("hudi").load(tablePath)
136+
// Verify we still have 2 records
137+
assertEquals(2, dfFinal.count(), "Should still have 2 records after upsert")
138+
// Verify the schema includes the new field 'e'
139+
val finalSchema = dfFinal.schema
140+
val itemsField = finalSchema.fields.find(_.name == "items").get
141+
assertTrue(itemsField.dataType.isInstanceOf[ArrayType], "items should be an ArrayType")
142+
val arrayType = itemsField.dataType.asInstanceOf[ArrayType]
143+
assertTrue(arrayType.elementType.isInstanceOf[StructType], "items array element should be StructType")
144+
val structType = arrayType.elementType.asInstanceOf[StructType]
145+
val fieldNames = structType.fields.map(_.name)
146+
assertTrue(fieldNames.contains("e"), "Schema should include the evolved field 'e'")
147+
assertEquals(5, fieldNames.length, "Struct should have 5 fields (a, b, c, d, e)")
148+
// Verify we can read all data without errors (this would fail if data is corrupted)
149+
dfFinal.foreach(_ => {})
150+
151+
// Verify data for id="1" (updated record)
152+
val record1Rows = dfFinal.filter("id = '1'").select("items").collect()
153+
assertEquals(1, record1Rows.length, "Should have exactly one record with id='1'")
154+
val items1 = record1Rows(0).getAs[Seq[Row]]("items")
155+
assertNotNull(items1, "items should not be null for id='1'")
156+
assertEquals(3, items1.length, "id='1' should have 3 items")
157+
// Verify first item of id="1" has all fields including 'e'
158+
val firstItem1 = items1(0)
159+
assertEquals(1, firstItem1.getInt(0), "First item 'a' should be 1")
160+
assertEquals(11, firstItem1.getInt(1), "First item 'b' should be 11")
161+
assertEquals(111, firstItem1.getInt(2), "First item 'c' should be 111")
162+
assertEquals(1111, firstItem1.getInt(3), "First item 'd' should be 1111")
163+
assertEquals(11111, firstItem1.getInt(4), "First item 'e' should be 11111")
164+
// Verify second item of id="1"
165+
val secondItem1 = items1(1)
166+
assertEquals(2, secondItem1.getInt(0), "Second item 'a' should be 2")
167+
assertEquals(22, secondItem1.getInt(1), "Second item 'b' should be 22")
168+
assertEquals(222, secondItem1.getInt(2), "Second item 'c' should be 222")
169+
assertEquals(2222, secondItem1.getInt(3), "Second item 'd' should be 2222")
170+
assertEquals(22222, secondItem1.getInt(4), "Second item 'e' should be 22222")
171+
172+
// Verify data for id="2" (unchanged record - should have null for 'e')
173+
val record2Rows = dfFinal.filter("id = '2'").select("items").collect()
174+
assertEquals(1, record2Rows.length, "Should have exactly one record with id='2'")
175+
val items2 = record2Rows(0).getAs[Seq[Row]]("items")
176+
assertNotNull(items2, "items should not be null for id='2'")
177+
assertEquals(3, items2.length, "id='2' should have 3 items")
178+
// Verify first item of id="2" - should have original values and null for 'e'
179+
val firstItem2 = items2(0)
180+
assertEquals(10, firstItem2.getInt(0), "First item 'a' should be 10")
181+
assertEquals(77, firstItem2.getInt(1), "First item 'b' should be 77")
182+
assertEquals(777, firstItem2.getInt(2), "First item 'c' should be 777")
183+
assertEquals(7777, firstItem2.getInt(3), "First item 'd' should be 7777")
184+
assertTrue(firstItem2.isNullAt(4), "First item 'e' should be null for unchanged record")
185+
// Verify second item of id="2"
186+
val secondItem2 = items2(1)
187+
assertEquals(20, secondItem2.getInt(0), "Second item 'a' should be 20")
188+
assertEquals(88, secondItem2.getInt(1), "Second item 'b' should be 88")
189+
assertEquals(888, secondItem2.getInt(2), "Second item 'c' should be 888")
190+
assertEquals(8888, secondItem2.getInt(3), "Second item 'd' should be 8888")
191+
assertTrue(secondItem2.isNullAt(4), "Second item 'e' should be null for unchanged record")
192+
}
193+
}
194+

0 commit comments

Comments
 (0)