Skip to content

Commit e13a87f

Browse files
Spark 3.4: Backport writing DVs to Spark 3.4 (apache#12019)
1 parent 6eef780 commit e13a87f

File tree

14 files changed

+395
-54
lines changed

14 files changed

+395
-54
lines changed

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
2424
import static org.apache.iceberg.PlanningMode.LOCAL;
2525
import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP;
26+
import static org.apache.iceberg.SnapshotSummary.ADDED_DVS_PROP;
2627
import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP;
28+
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
2729
import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP;
2830
import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP;
2931
import static org.apache.iceberg.TableProperties.DATA_PLANNING_MODE;
3032
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
3133
import static org.apache.iceberg.TableProperties.DELETE_PLANNING_MODE;
34+
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
3235
import static org.apache.iceberg.TableProperties.ORC_VECTORIZATION_ENABLED;
3336
import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
3437
import static org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
@@ -55,8 +58,10 @@
5558
import org.apache.iceberg.Snapshot;
5659
import org.apache.iceberg.SnapshotRef;
5760
import org.apache.iceberg.Table;
61+
import org.apache.iceberg.TableProperties;
5862
import org.apache.iceberg.data.GenericRecord;
5963
import org.apache.iceberg.data.parquet.GenericParquetWriter;
64+
import org.apache.iceberg.deletes.DeleteGranularity;
6065
import org.apache.iceberg.io.DataWriter;
6166
import org.apache.iceberg.io.OutputFile;
6267
import org.apache.iceberg.parquet.Parquet;
@@ -85,6 +90,7 @@ public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTes
8590
protected final boolean fanoutEnabled;
8691
protected final String branch;
8792
protected final PlanningMode planningMode;
93+
protected final int formatVersion;
8894

8995
public SparkRowLevelOperationsTestBase(
9096
String catalogName,
@@ -95,21 +101,23 @@ public SparkRowLevelOperationsTestBase(
95101
String distributionMode,
96102
boolean fanoutEnabled,
97103
String branch,
98-
PlanningMode planningMode) {
104+
PlanningMode planningMode,
105+
int formatVersion) {
99106
super(catalogName, implementation, config);
100107
this.fileFormat = fileFormat;
101108
this.vectorized = vectorized;
102109
this.distributionMode = distributionMode;
103110
this.fanoutEnabled = fanoutEnabled;
104111
this.branch = branch;
105112
this.planningMode = planningMode;
113+
this.formatVersion = formatVersion;
106114
}
107115

108116
@Parameters(
109117
name =
110118
"catalogName = {0}, implementation = {1}, config = {2},"
111119
+ " format = {3}, vectorized = {4}, distributionMode = {5},"
112-
+ " fanout = {6}, branch = {7}, planningMode = {8}")
120+
+ " fanout = {6}, branch = {7}, planningMode = {8}, formatVersion = {9}")
113121
public static Object[][] parameters() {
114122
return new Object[][] {
115123
{
@@ -123,7 +131,8 @@ public static Object[][] parameters() {
123131
WRITE_DISTRIBUTION_MODE_NONE,
124132
true,
125133
SnapshotRef.MAIN_BRANCH,
126-
LOCAL
134+
LOCAL,
135+
2
127136
},
128137
{
129138
"testhive",
@@ -136,7 +145,8 @@ public static Object[][] parameters() {
136145
WRITE_DISTRIBUTION_MODE_NONE,
137146
false,
138147
"test",
139-
DISTRIBUTED
148+
DISTRIBUTED,
149+
2
140150
},
141151
{
142152
"testhadoop",
@@ -147,7 +157,8 @@ public static Object[][] parameters() {
147157
WRITE_DISTRIBUTION_MODE_HASH,
148158
true,
149159
null,
150-
LOCAL
160+
LOCAL,
161+
2
151162
},
152163
{
153164
"spark_catalog",
@@ -165,16 +176,52 @@ public static Object[][] parameters() {
165176
WRITE_DISTRIBUTION_MODE_RANGE,
166177
false,
167178
"test",
168-
DISTRIBUTED
169-
}
179+
DISTRIBUTED,
180+
2
181+
},
182+
{
183+
"testhadoop",
184+
SparkCatalog.class.getName(),
185+
ImmutableMap.of("type", "hadoop"),
186+
"parquet",
187+
RANDOM.nextBoolean(),
188+
WRITE_DISTRIBUTION_MODE_HASH,
189+
true,
190+
null,
191+
LOCAL,
192+
3
193+
},
194+
{
195+
"spark_catalog",
196+
SparkSessionCatalog.class.getName(),
197+
ImmutableMap.of(
198+
"type",
199+
"hive",
200+
"default-namespace",
201+
"default",
202+
"clients",
203+
"1",
204+
"parquet-enabled",
205+
"false",
206+
"cache-enabled",
207+
"false" // Spark will delete tables using v1, leaving the cache out of sync
208+
),
209+
"avro",
210+
false,
211+
WRITE_DISTRIBUTION_MODE_RANGE,
212+
false,
213+
"test",
214+
DISTRIBUTED,
215+
3
216+
},
170217
};
171218
}
172219

173220
protected abstract Map<String, String> extraTableProperties();
174221

175222
protected void initTable() {
176223
sql(
177-
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s')",
224+
"ALTER TABLE %s SET TBLPROPERTIES('%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s', '%s' '%s')",
178225
tableName,
179226
DEFAULT_FILE_FORMAT,
180227
fileFormat,
@@ -185,7 +232,9 @@ protected void initTable() {
185232
DATA_PLANNING_MODE,
186233
planningMode.modeName(),
187234
DELETE_PLANNING_MODE,
188-
planningMode.modeName());
235+
planningMode.modeName(),
236+
FORMAT_VERSION,
237+
formatVersion);
189238

190239
switch (fileFormat) {
191240
case "parquet":
@@ -310,6 +359,10 @@ protected void validateSnapshot(
310359
validateProperty(snapshot, DELETED_FILES_PROP, deletedDataFiles);
311360
validateProperty(snapshot, ADDED_DELETE_FILES_PROP, addedDeleteFiles);
312361
validateProperty(snapshot, ADDED_FILES_PROP, addedDataFiles);
362+
if (formatVersion >= 3) {
363+
validateProperty(snapshot, ADDED_DVS_PROP, addedDeleteFiles);
364+
assertThat(snapshot.summary()).doesNotContainKey(ADD_POS_DELETE_FILES_PROP);
365+
}
313366
}
314367

315368
protected void validateProperty(Snapshot snapshot, String property, Set<String> expectedValues) {
@@ -401,4 +454,12 @@ protected void assertAllBatchScansVectorized(SparkPlan plan) {
401454
List<SparkPlan> batchScans = SparkPlanUtil.collectBatchScans(plan);
402455
assertThat(batchScans).hasSizeGreaterThan(0).allMatch(SparkPlan::supportsColumnar);
403456
}
457+
458+
protected void createTableWithDeleteGranularity(
459+
String schema, String partitionedBy, DeleteGranularity deleteGranularity) {
460+
createAndInitTable(schema, partitionedBy, null /* empty */);
461+
sql(
462+
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
463+
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
464+
}
404465
}

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public TestCopyOnWriteDelete(
6464
String distributionMode,
6565
boolean fanoutEnabled,
6666
String branch,
67-
PlanningMode planningMode) {
67+
PlanningMode planningMode,
68+
int formatVersion) {
6869
super(
6970
catalogName,
7071
implementation,
@@ -74,7 +75,8 @@ public TestCopyOnWriteDelete(
7475
distributionMode,
7576
fanoutEnabled,
7677
branch,
77-
planningMode);
78+
planningMode,
79+
formatVersion);
7880
}
7981

8082
@Override

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public TestCopyOnWriteMerge(
6262
String distributionMode,
6363
boolean fanoutEnabled,
6464
String branch,
65-
PlanningMode planningMode) {
65+
PlanningMode planningMode,
66+
int formatVersion) {
6667
super(
6768
catalogName,
6869
implementation,
@@ -72,7 +73,8 @@ public TestCopyOnWriteMerge(
7273
distributionMode,
7374
fanoutEnabled,
7475
branch,
75-
planningMode);
76+
planningMode,
77+
formatVersion);
7678
}
7779

7880
@Override

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public TestCopyOnWriteUpdate(
6161
String distributionMode,
6262
boolean fanoutEnabled,
6363
String branch,
64-
PlanningMode planningMode) {
64+
PlanningMode planningMode,
65+
int formatVersion) {
6566
super(
6667
catalogName,
6768
implementation,
@@ -71,7 +72,8 @@ public TestCopyOnWriteUpdate(
7172
distributionMode,
7273
fanoutEnabled,
7374
branch,
74-
planningMode);
75+
planningMode,
76+
formatVersion);
7577
}
7678

7779
@Override

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import static org.apache.iceberg.DataOperations.DELETE;
2222
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
23+
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
24+
import static org.apache.iceberg.SnapshotSummary.ADDED_DVS_PROP;
2325
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
2426
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
2527
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
@@ -97,7 +99,8 @@ public TestDelete(
9799
String distributionMode,
98100
boolean fanoutEnabled,
99101
String branch,
100-
PlanningMode planningMode) {
102+
PlanningMode planningMode,
103+
int formatVersion) {
101104
super(
102105
catalogName,
103106
implementation,
@@ -107,7 +110,8 @@ public TestDelete(
107110
distributionMode,
108111
fanoutEnabled,
109112
branch,
110-
planningMode);
113+
planningMode,
114+
formatVersion);
111115
}
112116

113117
@BeforeClass
@@ -201,6 +205,9 @@ public void testCoalesceDelete() throws Exception {
201205
// AQE detects that all shuffle blocks are small and processes them in 1 task
202206
// otherwise, there would be 200 tasks writing to the table
203207
validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "1");
208+
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
209+
validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
210+
validateProperty(snapshot, ADDED_DVS_PROP, "4");
204211
} else {
205212
// MoR DELETE requests the deleted records to be range distributed by partition and `_file`
206213
// each task contains only 1 file and therefore writes only 1 shuffle block
@@ -523,7 +530,8 @@ public void deleteSingleRecordProducesDeleteOperation() throws NoSuchTableExcept
523530
} else {
524531
// this is a RowDelta that produces a "delete" instead of "overwrite"
525532
validateMergeOnRead(currentSnapshot, "1", "1", null);
526-
validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1");
533+
String property = formatVersion >= 3 ? ADDED_DVS_PROP : ADD_POS_DELETE_FILES_PROP;
534+
validateProperty(currentSnapshot, property, "1");
527535
}
528536

529537
assertThat(sql("SELECT * FROM %s", tableName))
@@ -1287,6 +1295,8 @@ public void testDeleteWithMultipleSpecs() {
12871295
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
12881296
if (mode(table) == COPY_ON_WRITE) {
12891297
validateCopyOnWrite(currentSnapshot, "3", "4", "1");
1298+
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
1299+
validateMergeOnRead(currentSnapshot, "3", "4", null);
12901300
} else {
12911301
validateMergeOnRead(currentSnapshot, "3", "3", null);
12921302
}

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iceberg.spark.extensions;
2020

2121
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
22+
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
2223
import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
2324
import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
2425
import static org.apache.iceberg.TableProperties.MERGE_MODE;
@@ -86,7 +87,8 @@ public TestMerge(
8687
String distributionMode,
8788
boolean fanoutEnabled,
8889
String branch,
89-
PlanningMode planningMode) {
90+
PlanningMode planningMode,
91+
int formatVersion) {
9092
super(
9193
catalogName,
9294
implementation,
@@ -96,7 +98,8 @@ public TestMerge(
9698
distributionMode,
9799
fanoutEnabled,
98100
branch,
99-
planningMode);
101+
planningMode,
102+
formatVersion);
100103
}
101104

102105
@BeforeClass
@@ -207,6 +210,9 @@ public void testCoalesceMerge() {
207210
// AQE detects that all shuffle blocks are small and processes them in 1 task
208211
// otherwise, there would be 200 tasks writing to the table
209212
validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "1");
213+
} else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) {
214+
validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
215+
validateProperty(currentSnapshot, SnapshotSummary.ADDED_DVS_PROP, "4");
210216
} else {
211217
// MoR MERGE would perform a join on `id`
212218
// every task has data for each of 200 reducers

0 commit comments

Comments
 (0)