Skip to content

Commit d96901b

Browse files
Spark 3.4: Backport rewriting historical file-scoped deletes (apache#11273) to 3.4 (apache#11975)
1 parent a0a1c00 commit d96901b

File tree

4 files changed

+289
-25
lines changed

4 files changed

+289
-25
lines changed

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@
3939
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4040
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4141
import org.apache.iceberg.spark.SparkSQLProperties;
42+
import org.apache.iceberg.spark.source.SimpleRecord;
4243
import org.apache.iceberg.spark.source.SparkTable;
4344
import org.apache.iceberg.spark.source.TestSparkCatalog;
4445
import org.apache.iceberg.util.SnapshotUtil;
46+
import org.apache.spark.sql.Encoders;
4547
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
4648
import org.apache.spark.sql.connector.catalog.Identifier;
4749
import org.junit.Assert;
@@ -120,6 +122,87 @@ public void testDeletePartitionGranularity() throws NoSuchTableException {
120122
checkDeleteFileGranularity(DeleteGranularity.PARTITION);
121123
}
122124

125+
@Test
126+
public void testPositionDeletesAreMaintainedDuringDelete() throws NoSuchTableException {
127+
sql(
128+
"CREATE TABLE %s (id int, data string) USING iceberg PARTITIONED BY (id) TBLPROPERTIES"
129+
+ "('%s'='%s', '%s'='%s', '%s'='%s')",
130+
tableName,
131+
TableProperties.FORMAT_VERSION,
132+
2,
133+
TableProperties.DELETE_MODE,
134+
"merge-on-read",
135+
TableProperties.DELETE_GRANULARITY,
136+
"file");
137+
createBranchIfNeeded();
138+
139+
List<SimpleRecord> records =
140+
Lists.newArrayList(
141+
new SimpleRecord(1, "a"),
142+
new SimpleRecord(1, "b"),
143+
new SimpleRecord(1, "c"),
144+
new SimpleRecord(2, "d"),
145+
new SimpleRecord(2, "e"));
146+
spark
147+
.createDataset(records, Encoders.bean(SimpleRecord.class))
148+
.coalesce(1)
149+
.writeTo(commitTarget())
150+
.append();
151+
152+
sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget());
153+
sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget());
154+
sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget());
155+
156+
Table table = validationCatalog.loadTable(tableIdent);
157+
Snapshot latest = SnapshotUtil.latestSnapshot(table, branch);
158+
assertThat(latest.removedDeleteFiles(table.io())).hasSize(1);
159+
assertEquals(
160+
"Should have expected rows",
161+
ImmutableList.of(row(1, "b"), row(2, "e")),
162+
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
163+
}
164+
165+
@Test
166+
public void testUnpartitionedPositionDeletesAreMaintainedDuringDelete()
167+
throws NoSuchTableException {
168+
sql(
169+
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES"
170+
+ "('%s'='%s', '%s'='%s', '%s'='%s')",
171+
tableName,
172+
TableProperties.FORMAT_VERSION,
173+
2,
174+
TableProperties.DELETE_MODE,
175+
"merge-on-read",
176+
TableProperties.DELETE_GRANULARITY,
177+
"file");
178+
createBranchIfNeeded();
179+
180+
List<SimpleRecord> records =
181+
Lists.newArrayList(
182+
new SimpleRecord(1, "a"),
183+
new SimpleRecord(1, "b"),
184+
new SimpleRecord(1, "c"),
185+
new SimpleRecord(2, "d"),
186+
new SimpleRecord(2, "e"));
187+
spark
188+
.createDataset(records, Encoders.bean(SimpleRecord.class))
189+
.coalesce(1)
190+
.writeTo(commitTarget())
191+
.append();
192+
193+
sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget());
194+
sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget());
195+
sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget());
196+
197+
Table table = validationCatalog.loadTable(tableIdent);
198+
Snapshot latest = SnapshotUtil.latestSnapshot(table, branch);
199+
assertThat(latest.removedDeleteFiles(table.io())).hasSize(1);
200+
assertEquals(
201+
"Should have expected rows",
202+
ImmutableList.of(row(1, "b"), row(2, "e")),
203+
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
204+
}
205+
123206
private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
124207
throws NoSuchTableException {
125208
createAndInitPartitionedTable();

spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iceberg.spark.extensions;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assumptions.assumeThat;
2223

2324
import java.util.Map;
2425
import org.apache.iceberg.PlanningMode;
@@ -75,19 +76,82 @@ public void testUpdatePartitionGranularity() {
7576
checkUpdateFileGranularity(DeleteGranularity.PARTITION);
7677
}
7778

78-
private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
79-
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);
79+
@Test
80+
public void testUpdateFileGranularityMergesDeleteFiles() {
81+
// Range distribution will produce partition scoped deletes which will not be cleaned up
82+
assumeThat(distributionMode).isNotEqualToIgnoringCase("range");
8083

81-
sql(
82-
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
83-
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
84+
checkUpdateFileGranularity(DeleteGranularity.FILE);
85+
sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget());
86+
Table table = validationCatalog.loadTable(tableIdent);
87+
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
88+
String expectedDeleteFilesCount = "2";
89+
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");
8490

85-
append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
86-
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
87-
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
88-
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");
91+
assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2);
92+
assertEquals(
93+
"Should have expected rows",
94+
ImmutableList.of(
95+
row(0, "hr"),
96+
row(2, "hr"),
97+
row(2, "hr"),
98+
row(5, "hr"),
99+
row(0, "it"),
100+
row(2, "it"),
101+
row(2, "it"),
102+
row(5, "it")),
103+
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
104+
}
89105

90-
createBranchIfNeeded();
106+
@Test
107+
public void testUpdateUnpartitionedFileGranularityMergesDeleteFiles() {
108+
// Range distribution will produce partition scoped deletes which will not be cleaned up
109+
assumeThat(distributionMode).isNotEqualToIgnoringCase("range");
110+
initTable("", DeleteGranularity.FILE);
111+
112+
sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());
113+
114+
Table table = validationCatalog.loadTable(tableIdent);
115+
assertThat(table.snapshots()).hasSize(5);
116+
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
117+
String expectedDeleteFilesCount = "4";
118+
validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1");
119+
assertEquals(
120+
"Should have expected rows",
121+
ImmutableList.of(
122+
row(0, "hr"),
123+
row(2, "hr"),
124+
row(2, "hr"),
125+
row(4, "hr"),
126+
row(0, "it"),
127+
row(2, "it"),
128+
row(2, "it"),
129+
row(4, "it")),
130+
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
131+
132+
sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget());
133+
table.refresh();
134+
currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
135+
expectedDeleteFilesCount = "2";
136+
137+
validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1");
138+
assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2);
139+
assertEquals(
140+
"Should have expected rows",
141+
ImmutableList.of(
142+
row(0, "hr"),
143+
row(2, "hr"),
144+
row(2, "hr"),
145+
row(5, "hr"),
146+
row(0, "it"),
147+
row(2, "it"),
148+
row(2, "it"),
149+
row(5, "it")),
150+
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
151+
}
152+
153+
private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
154+
initTable("PARTITIONED BY (dep)", deleteGranularity);
91155

92156
sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());
93157

@@ -111,4 +175,19 @@ private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
111175
row(4, "it")),
112176
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
113177
}
178+
179+
private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) {
180+
createAndInitTable("id INT, dep STRING", partitionedBy, null /* empty */);
181+
182+
sql(
183+
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
184+
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
185+
186+
append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
187+
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
188+
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
189+
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");
190+
191+
createBranchIfNeeded();
192+
}
114193
}

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Set;
2525
import java.util.function.Supplier;
2626
import java.util.stream.Collectors;
27+
import org.apache.iceberg.DeleteFile;
28+
import org.apache.iceberg.FileScanTask;
2729
import org.apache.iceberg.PartitionField;
2830
import org.apache.iceberg.PartitionScanTask;
2931
import org.apache.iceberg.PartitionSpec;
@@ -48,6 +50,8 @@
4850
import org.apache.iceberg.spark.SparkReadConf;
4951
import org.apache.iceberg.spark.SparkSchemaUtil;
5052
import org.apache.iceberg.spark.SparkV2Filters;
53+
import org.apache.iceberg.util.ContentFileUtil;
54+
import org.apache.iceberg.util.DeleteFileSet;
5155
import org.apache.iceberg.util.SnapshotUtil;
5256
import org.apache.spark.sql.SparkSession;
5357
import org.apache.spark.sql.connector.expressions.NamedReference;
@@ -158,6 +162,23 @@ public void filter(Predicate[] predicates) {
158162
}
159163
}
160164

165+
protected Map<String, DeleteFileSet> rewritableDeletes() {
166+
Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();
167+
168+
for (ScanTask task : tasks()) {
169+
FileScanTask fileScanTask = task.asFileScanTask();
170+
for (DeleteFile deleteFile : fileScanTask.deletes()) {
171+
if (ContentFileUtil.isFileScoped(deleteFile)) {
172+
rewritableDeletes
173+
.computeIfAbsent(fileScanTask.file().location(), ignored -> DeleteFileSet.create())
174+
.add(deleteFile);
175+
}
176+
}
177+
}
178+
179+
return rewritableDeletes;
180+
}
181+
161182
// at this moment, Spark can only pass IN filters for a single attribute
162183
// if there are multiple filter attributes, Spark will pass two separate IN filters
163184
private Expression convertRuntimeFilters(Predicate[] predicates) {

0 commit comments

Comments
 (0)