|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | + * contributor license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright ownership. |
| 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | + * (the "License"); you may not use this file except in compliance with |
| 7 | + * the License. You may obtain a copy of the License at |
| 8 | + * |
| 9 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | + * |
| 11 | + * Unless required by applicable law or agreed to in writing, software |
| 12 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | + * See the License for the specific language governing permissions and |
| 15 | + * limitations under the License. |
| 16 | + */ |
| 17 | + |
| 18 | +package org.apache.hudi.functional |
| 19 | + |
| 20 | +import org.apache.hudi.common.table.HoodieTableConfig |
| 21 | +import org.apache.hudi.config.HoodieWriteConfig |
| 22 | +import org.apache.hudi.testutils.HoodieSparkClientTestBase |
| 23 | +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, ScalaAssertionSupport} |
| 24 | +import org.apache.spark.sql.{Row, SaveMode, SparkSession} |
| 25 | +import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} |
| 26 | +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} |
| 27 | +import org.junit.jupiter.api.Assertions._ |
| 28 | + |
| 29 | +/** |
| 30 | + * Test to verify schema evolution for array of structs with record mergers. |
| 31 | + * This test reproduces a bug where schema evolution of array<struct<...>> fields |
| 32 | + * can cause data corruption when using certain record mergers. |
| 33 | + */ |
| 34 | +class TestArrayStructSchemaEvolution extends HoodieSparkClientTestBase with ScalaAssertionSupport { |
| 35 | + var spark: SparkSession = _ |
| 36 | + |
| 37 | + @BeforeEach override def setUp(): Unit = { |
| 38 | + initPath() |
| 39 | + initSparkContexts() |
| 40 | + spark = sqlContext.sparkSession |
| 41 | + initHoodieStorage() |
| 42 | + } |
| 43 | + |
| 44 | + @AfterEach override def tearDown(): Unit = { |
| 45 | + cleanupSparkContexts() |
| 46 | + cleanupFileSystem() |
| 47 | + } |
| 48 | + |
| 49 | + @Test |
| 50 | + def testArrayStructSchemaEvolutionWithRecordMerger(): Unit = { |
| 51 | + val tablePath = basePath + "/array_struct_bug_100" |
| 52 | + val tableName = "array_struct_bug_100" |
| 53 | + |
| 54 | + // ========================================================== |
| 55 | + // STEP 1: Initial schema (no evolution yet) |
| 56 | + // ========================================================== |
| 57 | + val schemaV1 = new StructType() |
| 58 | + .add("id", StringType, nullable = true) |
| 59 | + .add("items", ArrayType(new StructType() |
| 60 | + .add("a", IntegerType, nullable = true) |
| 61 | + .add("b", IntegerType, nullable = true) |
| 62 | + .add("c", IntegerType, nullable = true) |
| 63 | + .add("d", IntegerType, nullable = true) |
| 64 | + ), nullable = true) |
| 65 | + val row1Items = Seq( |
| 66 | + Row(1, 11, 111, 1111), |
| 67 | + Row(2, 22, 222, 2222), |
| 68 | + Row(3, 33, 333, 3333) |
| 69 | + ) |
| 70 | + val row2Items = Seq( |
| 71 | + Row(10, 77, 777, 7777), |
| 72 | + Row(20, 88, 888, 8888), |
| 73 | + Row(30, 99, 999, 9999) |
| 74 | + ) |
| 75 | + val initialData = Seq( |
| 76 | + Row("1", row1Items), |
| 77 | + Row("2", row2Items) |
| 78 | + ) |
| 79 | + val dfInit = spark.createDataFrame(spark.sparkContext.parallelize(initialData), schemaV1) |
| 80 | + |
| 81 | + // ========================================================== |
| 82 | + // STEP 2: Write initial data using INSERT (not bulk-insert) |
| 83 | + // ========================================================== |
| 84 | + val hudiOpts = Map( |
| 85 | + HoodieTableConfig.NAME.key -> tableName, |
| 86 | + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", |
| 87 | + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id", |
| 88 | + DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, |
| 89 | + DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, |
| 90 | + HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName |
| 91 | + ) |
| 92 | + dfInit.write.format("hudi").options(hudiOpts).mode(SaveMode.Overwrite).save(tablePath) |
| 93 | + // Verify initial data |
| 94 | + val dfAfterInsert = spark.read.format("hudi").load(tablePath) |
| 95 | + assertEquals(2, dfAfterInsert.count(), "Should have 2 records after initial insert") |
| 96 | + |
| 97 | + // ========================================================== |
| 98 | + // STEP 3: Schema evolution - Add a new field to the struct inside the array |
| 99 | + // ========================================================== |
| 100 | + val schemaV2 = new StructType() |
| 101 | + .add("id", StringType, nullable = true) |
| 102 | + .add("items", ArrayType(new StructType() |
| 103 | + .add("a", IntegerType, nullable = true) |
| 104 | + .add("b", IntegerType, nullable = true) |
| 105 | + .add("c", IntegerType, nullable = true) |
| 106 | + .add("d", IntegerType, nullable = true) |
| 107 | + .add("e", IntegerType, nullable = true) // <-- NEW FIELD |
| 108 | + ), nullable = true) |
| 109 | + val row1ItemsEvolved = Seq( |
| 110 | + Row(1, 11, 111, 1111, 11111), |
| 111 | + Row(2, 22, 222, 2222, 22222), |
| 112 | + Row(3, 33, 333, 3333, 33333) |
| 113 | + ) |
| 114 | + val dfEvolved = spark.createDataFrame( |
| 115 | + spark.sparkContext.parallelize(Seq(Row("1", row1ItemsEvolved))), |
| 116 | + schemaV2 |
| 117 | + ) |
| 118 | + |
| 119 | + // ========================================================== |
| 120 | + // STEP 4: Upsert with HoodieSparkRecordMerger |
| 121 | + // ========================================================== |
| 122 | + val hudiOptsUpsert = Map( |
| 123 | + DataSourceWriteOptions.TABLE_NAME.key -> tableName, |
| 124 | + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", |
| 125 | + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id", |
| 126 | + DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, |
| 127 | + DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, |
| 128 | + HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> "org.apache.hudi.HoodieSparkRecordMerger" |
| 129 | + ) |
| 130 | + dfEvolved.write.format("hudi").options(hudiOptsUpsert).mode(SaveMode.Append).save(tablePath) |
| 131 | + |
| 132 | + // ========================================================== |
| 133 | + // STEP 5: Load after upsert and verify data integrity |
| 134 | + // ========================================================== |
| 135 | + val dfFinal = spark.read.format("hudi").load(tablePath) |
| 136 | + // Verify we still have 2 records |
| 137 | + assertEquals(2, dfFinal.count(), "Should still have 2 records after upsert") |
| 138 | + // Verify the schema includes the new field 'e' |
| 139 | + val finalSchema = dfFinal.schema |
| 140 | + val itemsField = finalSchema.fields.find(_.name == "items").get |
| 141 | + assertTrue(itemsField.dataType.isInstanceOf[ArrayType], "items should be an ArrayType") |
| 142 | + val arrayType = itemsField.dataType.asInstanceOf[ArrayType] |
| 143 | + assertTrue(arrayType.elementType.isInstanceOf[StructType], "items array element should be StructType") |
| 144 | + val structType = arrayType.elementType.asInstanceOf[StructType] |
| 145 | + val fieldNames = structType.fields.map(_.name) |
| 146 | + assertTrue(fieldNames.contains("e"), "Schema should include the evolved field 'e'") |
| 147 | + assertEquals(5, fieldNames.length, "Struct should have 5 fields (a, b, c, d, e)") |
| 148 | + // Verify we can read all data without errors (this would fail if data is corrupted) |
| 149 | + dfFinal.foreach(_ => {}) |
| 150 | + |
| 151 | + // Verify data for id="1" (updated record) |
| 152 | + val record1Rows = dfFinal.filter("id = '1'").select("items").collect() |
| 153 | + assertEquals(1, record1Rows.length, "Should have exactly one record with id='1'") |
| 154 | + val items1 = record1Rows(0).getAs[Seq[Row]]("items") |
| 155 | + assertNotNull(items1, "items should not be null for id='1'") |
| 156 | + assertEquals(3, items1.length, "id='1' should have 3 items") |
| 157 | + // Verify first item of id="1" has all fields including 'e' |
| 158 | + val firstItem1 = items1(0) |
| 159 | + assertEquals(1, firstItem1.getInt(0), "First item 'a' should be 1") |
| 160 | + assertEquals(11, firstItem1.getInt(1), "First item 'b' should be 11") |
| 161 | + assertEquals(111, firstItem1.getInt(2), "First item 'c' should be 111") |
| 162 | + assertEquals(1111, firstItem1.getInt(3), "First item 'd' should be 1111") |
| 163 | + assertEquals(11111, firstItem1.getInt(4), "First item 'e' should be 11111") |
| 164 | + // Verify second item of id="1" |
| 165 | + val secondItem1 = items1(1) |
| 166 | + assertEquals(2, secondItem1.getInt(0), "Second item 'a' should be 2") |
| 167 | + assertEquals(22, secondItem1.getInt(1), "Second item 'b' should be 22") |
| 168 | + assertEquals(222, secondItem1.getInt(2), "Second item 'c' should be 222") |
| 169 | + assertEquals(2222, secondItem1.getInt(3), "Second item 'd' should be 2222") |
| 170 | + assertEquals(22222, secondItem1.getInt(4), "Second item 'e' should be 22222") |
| 171 | + |
| 172 | + // Verify data for id="2" (unchanged record - should have null for 'e') |
| 173 | + val record2Rows = dfFinal.filter("id = '2'").select("items").collect() |
| 174 | + assertEquals(1, record2Rows.length, "Should have exactly one record with id='2'") |
| 175 | + val items2 = record2Rows(0).getAs[Seq[Row]]("items") |
| 176 | + assertNotNull(items2, "items should not be null for id='2'") |
| 177 | + assertEquals(3, items2.length, "id='2' should have 3 items") |
| 178 | + // Verify first item of id="2" - should have original values and null for 'e' |
| 179 | + val firstItem2 = items2(0) |
| 180 | + assertEquals(10, firstItem2.getInt(0), "First item 'a' should be 10") |
| 181 | + assertEquals(77, firstItem2.getInt(1), "First item 'b' should be 77") |
| 182 | + assertEquals(777, firstItem2.getInt(2), "First item 'c' should be 777") |
| 183 | + assertEquals(7777, firstItem2.getInt(3), "First item 'd' should be 7777") |
| 184 | + assertTrue(firstItem2.isNullAt(4), "First item 'e' should be null for unchanged record") |
| 185 | + // Verify second item of id="2" |
| 186 | + val secondItem2 = items2(1) |
| 187 | + assertEquals(20, secondItem2.getInt(0), "Second item 'a' should be 20") |
| 188 | + assertEquals(88, secondItem2.getInt(1), "Second item 'b' should be 88") |
| 189 | + assertEquals(888, secondItem2.getInt(2), "Second item 'c' should be 888") |
| 190 | + assertEquals(8888, secondItem2.getInt(3), "Second item 'd' should be 8888") |
| 191 | + assertTrue(secondItem2.isNullAt(4), "Second item 'e' should be null for unchanged record") |
| 192 | + } |
| 193 | +} |
| 194 | + |
0 commit comments