Skip to content

Commit

Permalink
Spark: Synchronously merge new position deletes with old deletes (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar authored Nov 5, 2024
1 parent 549674b commit ad24d4b
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,10 @@ private DeleteWriteResult writeDeletes(Collection<CharSequence> paths) throws IO

private void validatePreviousDeletes(PositionDeleteIndex index) {
Preconditions.checkArgument(
index.deleteFiles().stream().allMatch(this::isFileScoped),
index.deleteFiles().stream().allMatch(ContentFileUtil::isFileScoped),
"Previous deletes must be file-scoped");
}

private boolean isFileScoped(DeleteFile deleteFile) {
return ContentFileUtil.referencedDataFile(deleteFile) != null;
}

private Collection<CharSequence> sort(Collection<CharSequence> paths) {
if (paths.size() <= 1) {
return paths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public static String referencedDataFileLocation(DeleteFile deleteFile) {
return location != null ? location.toString() : null;
}

public static boolean isFileScoped(DeleteFile deleteFile) {
return referencedDataFile(deleteFile) != null;
}

public static boolean isDV(DeleteFile deleteFile) {
return deleteFile.format() == FileFormat.PUFFIN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.TestSparkCatalog;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -99,6 +101,87 @@ public void testDeletePartitionGranularity() throws NoSuchTableException {
checkDeleteFileGranularity(DeleteGranularity.PARTITION);
}

@TestTemplate
public void testPositionDeletesAreMaintainedDuringDelete() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id int, data string) USING iceberg PARTITIONED BY (id) TBLPROPERTIES"
+ "('%s'='%s', '%s'='%s', '%s'='%s')",
tableName,
TableProperties.FORMAT_VERSION,
2,
TableProperties.DELETE_MODE,
"merge-on-read",
TableProperties.DELETE_GRANULARITY,
"file");
createBranchIfNeeded();

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(1, "b"),
new SimpleRecord(1, "c"),
new SimpleRecord(2, "d"),
new SimpleRecord(2, "e"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(commitTarget())
.append();

sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget());
sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget());
sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot latest = SnapshotUtil.latestSnapshot(table, branch);
assertThat(latest.removedDeleteFiles(table.io())).hasSize(1);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "b"), row(2, "e")),
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
}

@TestTemplate
public void testUnpartitionedPositionDeletesAreMaintainedDuringDelete()
throws NoSuchTableException {
sql(
"CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES"
+ "('%s'='%s', '%s'='%s', '%s'='%s')",
tableName,
TableProperties.FORMAT_VERSION,
2,
TableProperties.DELETE_MODE,
"merge-on-read",
TableProperties.DELETE_GRANULARITY,
"file");
createBranchIfNeeded();

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(1, "b"),
new SimpleRecord(1, "c"),
new SimpleRecord(2, "d"),
new SimpleRecord(2, "e"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(commitTarget())
.append();

sql("DELETE FROM %s WHERE id = 1 and data='a'", commitTarget());
sql("DELETE FROM %s WHERE id = 2 and data='d'", commitTarget());
sql("DELETE FROM %s WHERE id = 1 and data='c'", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Snapshot latest = SnapshotUtil.latestSnapshot(table, branch);
assertThat(latest.removedDeleteFiles(table.io())).hasSize(1);
assertEquals(
"Should have expected rows",
ImmutableList.of(row(1, "b"), row(2, "e")),
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
}

private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
throws NoSuchTableException {
createAndInitPartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.Map;
import org.apache.iceberg.ParameterizedTestExtension;
Expand Down Expand Up @@ -55,19 +56,82 @@ public void testUpdatePartitionGranularity() {
checkUpdateFileGranularity(DeleteGranularity.PARTITION);
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* empty */);
@TestTemplate
public void testUpdateFileGranularityMergesDeleteFiles() {
// Range distribution will produce partition scoped deletes which will not be cleaned up
assumeThat(distributionMode).isNotEqualToIgnoringCase("range");

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
checkUpdateFileGranularity(DeleteGranularity.FILE);
sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget());
Table table = validationCatalog.loadTable(tableIdent);
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = "2";
validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");
assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2);
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(5, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(5, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

createBranchIfNeeded();
@TestTemplate
public void testUpdateUnpartitionedFileGranularityMergesDeleteFiles() {
// Range distribution will produce partition scoped deletes which will not be cleaned up
assumeThat(distributionMode).isNotEqualToIgnoringCase("range");
initTable("", DeleteGranularity.FILE);

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).hasSize(5);
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
String expectedDeleteFilesCount = "4";
validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1");
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(4, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));

sql("UPDATE %s SET id = id + 1 WHERE id = 4", commitTarget());
table.refresh();
currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
expectedDeleteFilesCount = "2";

validateMergeOnRead(currentSnapshot, "1", expectedDeleteFilesCount, "1");
assertThat(currentSnapshot.removedDeleteFiles(table.io())).hasSize(2);
assertEquals(
"Should have expected rows",
ImmutableList.of(
row(0, "hr"),
row(2, "hr"),
row(2, "hr"),
row(5, "hr"),
row(0, "it"),
row(2, "it"),
row(2, "it"),
row(5, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
initTable("PARTITIONED BY (dep)", deleteGranularity);

sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());

Expand All @@ -91,4 +155,19 @@ private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) {
row(4, "it")),
sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
}

private void initTable(String partitionedBy, DeleteGranularity deleteGranularity) {
createAndInitTable("id INT, dep STRING", partitionedBy, null /* empty */);

sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);

append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, \"dep\": \"hr\" }");
append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, \"dep\": \"it\" }");
append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, \"dep\": \"it\" }");

createBranchIfNeeded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -48,6 +50,8 @@
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.expressions.NamedReference;
Expand Down Expand Up @@ -158,6 +162,23 @@ public void filter(Predicate[] predicates) {
}
}

protected Map<String, DeleteFileSet> rewritableDeletes() {
Map<String, DeleteFileSet> rewritableDeletes = Maps.newHashMap();

for (ScanTask task : tasks()) {
FileScanTask fileScanTask = task.asFileScanTask();
for (DeleteFile deleteFile : fileScanTask.deletes()) {
if (ContentFileUtil.isFileScoped(deleteFile)) {
rewritableDeletes
.computeIfAbsent(fileScanTask.file().location(), ignored -> DeleteFileSet.create())
.add(deleteFile);
}
}
}

return rewritableDeletes;
}

// at this moment, Spark can only pass IN filters for a single attribute
// if there are multiple filter attributes, Spark will pass two separate IN filters
private Expression convertRuntimeFilters(Predicate[] predicates) {
Expand Down
Loading

0 comments on commit ad24d4b

Please sign in to comment.