diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index d050094f06d6..e35551649555 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -22,20 +22,21 @@ public class PartitionStats implements StructLike { - private static final int STATS_COUNT = 12; + private static final int STATS_COUNT = 13; private StructLike partition; private int specId; private long dataRecordCount; private int dataFileCount; private long totalDataFileSizeInBytes; - private long positionDeleteRecordCount; + private long positionDeleteRecordCount; // also includes dv record count as per spec private int positionDeleteFileCount; private long equalityDeleteRecordCount; private int equalityDeleteFileCount; private Long totalRecordCount; // null by default private Long lastUpdatedAt; // null by default private Long lastUpdatedSnapshotId; // null by default + private int dvCount; public PartitionStats(StructLike partition, int specId) { this.partition = partition; @@ -90,6 +91,10 @@ public Long lastUpdatedSnapshotId() { return lastUpdatedSnapshotId; } + public int dvCount() { + return dvCount; + } + /** * Updates the partition stats from the data/delete file. * @@ -109,7 +114,12 @@ public void liveEntry(ContentFile file, Snapshot snapshot) { break; case POSITION_DELETES: this.positionDeleteRecordCount += file.recordCount(); - this.positionDeleteFileCount += 1; + if (file.format() == FileFormat.PUFFIN) { + this.dvCount += 1; + } else { + this.positionDeleteFileCount += 1; + } + break; case EQUALITY_DELETES: this.equalityDeleteRecordCount += file.recordCount(); @@ -156,7 +166,12 @@ void deletedEntryForIncrementalCompute(ContentFile file, Snapshot snapshot) { break; case POSITION_DELETES: this.positionDeleteRecordCount -= file.recordCount(); - this.positionDeleteFileCount -= 1; + if (file.format() == FileFormat.PUFFIN) { + this.dvCount -= 1; + } else { + this.positionDeleteFileCount -= 1; + } + break; case EQUALITY_DELETES: this.equalityDeleteRecordCount -= file.recordCount(); @@ -188,6 +203,7 @@ public void appendStats(PartitionStats entry) { this.positionDeleteFileCount += entry.positionDeleteFileCount; this.equalityDeleteRecordCount += entry.equalityDeleteRecordCount; this.equalityDeleteFileCount += entry.equalityDeleteFileCount; + this.dvCount += entry.dvCount; if (entry.totalRecordCount != null) { if (totalRecordCount == null) { @@ -241,6 +257,8 @@ public T get(int pos, Class javaClass) { return javaClass.cast(lastUpdatedAt); case 11: return javaClass.cast(lastUpdatedSnapshotId); + case 12: + return javaClass.cast(dvCount); default: throw new UnsupportedOperationException("Unknown position: " + pos); } @@ -289,6 +307,9 @@ public void set(int pos, T value) { case 11: this.lastUpdatedSnapshotId = (Long) value; break; + case 12: + this.dvCount = value == null ? 0 : (int) value; + break; default: throw new UnsupportedOperationException("Unknown position: " + pos); } diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index 71686c456792..343d5636be28 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; @@ -42,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Queues; import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.NestedField; @@ -89,17 +91,57 @@ private PartitionStatsHandler() {} NestedField.optional(11, "last_updated_at", LongType.get()); public static final NestedField LAST_UPDATED_SNAPSHOT_ID = NestedField.optional(12, "last_updated_snapshot_id", LongType.get()); + // Using default value for v3 field to support v3 reader reading file written by v2 + public static final NestedField DV_COUNT = + NestedField.required("dv_count") + .withId(13) + .ofType(Types.IntegerType.get()) + .withInitialDefault(Literal.of(0)) + .withWriteDefault(Literal.of(0)) + .build(); /** * Generates the partition stats file schema based on a combined partition type which considers * all specs in a table. * + *

Use this only for format version 1 and 2. For version 3 and above use {@link + * #schema(StructType, int)} + * * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link * Partitioning#partitionType(Table)}. * @return a schema that corresponds to the provided unified partition type. + * @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link #schema(StructType, int)} + * instead. */ + @Deprecated public static Schema schema(StructType unifiedPartitionType) { Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); + return v2Schema(unifiedPartitionType); + } + + /** + * Generates the partition stats file schema for a given format version based on a combined + * partition type which considers all specs in a table. + * + * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link + * Partitioning#partitionType(Table)}. + * @return a schema that corresponds to the provided unified partition type. + */ + public static Schema schema(StructType unifiedPartitionType, int formatVersion) { + Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); + Preconditions.checkState( + formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION, + "Invalid format version: %d", + formatVersion); + + if (formatVersion <= 2) { + return v2Schema(unifiedPartitionType); + } + + return v3Schema(unifiedPartitionType); + } + + private static Schema v2Schema(StructType unifiedPartitionType) { return new Schema( NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), SPEC_ID, @@ -115,6 +157,35 @@ public static Schema schema(StructType unifiedPartitionType) { LAST_UPDATED_SNAPSHOT_ID); } + private static Schema v3Schema(StructType unifiedPartitionType) { + return new Schema( + NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + TOTAL_DATA_FILE_SIZE_IN_BYTES, + NestedField.required( + POSITION_DELETE_RECORD_COUNT.fieldId(), + POSITION_DELETE_RECORD_COUNT.name(), + LongType.get()), + NestedField.required( + POSITION_DELETE_FILE_COUNT.fieldId(), + POSITION_DELETE_FILE_COUNT.name(), + IntegerType.get()), + NestedField.required( + EQUALITY_DELETE_RECORD_COUNT.fieldId(), + EQUALITY_DELETE_RECORD_COUNT.name(), + LongType.get()), + NestedField.required( + EQUALITY_DELETE_FILE_COUNT.fieldId(), + EQUALITY_DELETE_FILE_COUNT.name(), + IntegerType.get()), + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID, + DV_COUNT); + } + /** * Computes the stats incrementally after the snapshot that has partition stats file till the * current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after @@ -184,7 +255,10 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long List sortedStats = sortStatsByPartition(stats, partitionType); return writePartitionStatsFile( - table, snapshot.snapshotId(), schema(partitionType), sortedStats); + table, + snapshot.snapshotId(), + schema(partitionType, TableUtil.formatVersion(table)), + sortedStats); } @VisibleForTesting @@ -263,13 +337,13 @@ private static Collection computeAndMergeStatsIncremental( Table table, Snapshot snapshot, StructType partitionType, - PartitionStatisticsFile previousStatsFile) - throws IOException { + PartitionStatisticsFile previousStatsFile) { PartitionMap statsMap = PartitionMap.create(table.specs()); // read previous stats, note that partition field will be read as GenericRecord try (CloseableIterable oldStats = readPartitionStatsFile( - schema(partitionType), table.io().newInputFile(previousStatsFile.path()))) { + schema(partitionType, TableUtil.formatVersion(table)), + table.io().newInputFile(previousStatsFile.path()))) { oldStats.forEach( partitionStats -> statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index ce20a7515d37..cf39e4611bf4 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -59,12 +59,22 @@ import org.apache.iceberg.types.Types; import org.assertj.core.groups.Tuple; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +@ExtendWith(ParameterizedTestExtension.class) public abstract class PartitionStatsHandlerTestBase { public abstract FileFormat format(); + @Parameters(name = "formatVersion = {0}") + protected static List formatVersions() { + return TestHelpers.V2_AND_ABOVE; + } + + @Parameter protected int formatVersion; + private static final Schema SCHEMA = new Schema( optional(1, "c1", Types.IntegerType.get()), @@ -92,6 +102,7 @@ public abstract class PartitionStatsHandlerTestBase { private static final int TOTAL_RECORD_COUNT_POSITION = 9; private static final int LAST_UPDATED_AT_POSITION = 10; private static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11; + private static final int DV_COUNT_POSITION = 12; @Test public void testPartitionStatsOnEmptyTable() throws Exception { @@ -140,7 +151,7 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception { .hasMessage("Table must be partitioned"); } - @Test + @TestTemplate public void testAllDatatypePartitionWriting() throws Exception { Schema schema = new Schema( @@ -183,10 +194,15 @@ public void testAllDatatypePartitionWriting() throws Exception { Table testTable = TestTables.create( - tempDir("test_all_type"), "test_all_type", schema, spec, 2, fileFormatProperty); + tempDir("test_all_type_" + formatVersion), + "test_all_type_" + formatVersion, + schema, + spec, + formatVersion, + fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion); PartitionData partitionData = new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); @@ -229,20 +245,20 @@ public void testAllDatatypePartitionWriting() throws Exception { } } - @Test + @TestTemplate public void testOptionalFieldsWriting() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); Table testTable = TestTables.create( - tempDir("test_partition_stats_optional"), - "test_partition_stats_optional", + tempDir("test_partition_stats_optional_" + formatVersion), + "test_partition_stats_optional_" + formatVersion, SCHEMA, spec, - 2, + formatVersion, fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion); ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); for (int i = 0; i < 5; i++) { @@ -261,6 +277,7 @@ public void testOptionalFieldsWriting() throws Exception { stats.set(TOTAL_RECORD_COUNT_POSITION, null); stats.set(LAST_UPDATED_AT_POSITION, null); stats.set(LAST_UPDATED_SNAPSHOT_ID_POSITION, null); + stats.set(DV_COUNT_POSITION, null); partitionListBuilder.add(stats); } @@ -275,10 +292,11 @@ public void testOptionalFieldsWriting() throws Exception { PartitionStats::equalityDeleteFileCount, PartitionStats::totalRecords, PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId) + PartitionStats::lastUpdatedSnapshotId, + PartitionStats::dvCount) .isEqualTo( Arrays.asList( - 0L, 0, 0L, 0, null, null, null)); // null counters must be initialized to zero. + 0L, 0, 0L, 0, null, null, null, 0)); // null counters must be initialized to zero. PartitionStatisticsFile statisticsFile = PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); @@ -330,7 +348,8 @@ public void testPartitionStats() throws Exception { } Snapshot snapshot1 = testTable.currentSnapshot(); - Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2); + Types.StructType partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); computeAndValidatePartitionStats( @@ -348,7 +367,8 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId()), + snapshot1.snapshotId(), + 0), Tuple.tuple( partitionRecord(partitionType, "foo", "B"), 0, @@ -361,7 +381,8 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId()), + snapshot1.snapshotId(), + 0), Tuple.tuple( partitionRecord(partitionType, "bar", "A"), 0, @@ -374,7 +395,8 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId()), + snapshot1.snapshotId(), + 0), Tuple.tuple( partitionRecord(partitionType, "bar", "B"), 0, @@ -387,16 +409,26 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId())); + snapshot1.snapshotId(), + 0)); - DeleteFile posDeletes = commitPositionDeletes(testTable); - Snapshot snapshot2 = testTable.currentSnapshot(); + DeleteFile posDelete = + FileGenerationUtil.generatePositionDeleteFile(testTable, TestHelpers.Row.of("bar", "A")); + testTable.newRowDelta().addDeletes(posDelete).commit(); + // snapshot2 is unused in the result as same partition was updated by snapshot4 - DeleteFile eqDeletes = commitEqualityDeletes(testTable); + DeleteFile eqDelete = + FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); + testTable.newRowDelta().addDeletes(eqDelete).commit(); Snapshot snapshot3 = testTable.currentSnapshot(); - recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); - partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); + testTable.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + DeleteFile dv = FileGenerationUtil.generateDV(testTable, dataFile3); + testTable.newRowDelta().addDeletes(dv).commit(); + Snapshot snapshot4 = testTable.currentSnapshot(); + + recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 3); + computeAndValidatePartitionStats( testTable, recordSchema, @@ -408,11 +440,12 @@ public void testPartitionStats() throws Exception { 3 * dataFile1.fileSizeInBytes(), 0L, 0, - eqDeletes.recordCount(), + eqDelete.recordCount(), 1, null, snapshot3.timestampMillis(), - snapshot3.snapshotId()), + snapshot3.snapshotId(), + 0), Tuple.tuple( partitionRecord(partitionType, "foo", "B"), 0, @@ -425,20 +458,22 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId()), + snapshot1.snapshotId(), + 0), Tuple.tuple( partitionRecord(partitionType, "bar", "A"), 0, 3 * dataFile3.recordCount(), 3, 3 * dataFile3.fileSizeInBytes(), - posDeletes.recordCount(), + posDelete.recordCount() + dv.recordCount(), 1, 0L, 0, null, - snapshot2.timestampMillis(), - snapshot2.snapshotId()), + snapshot4.timestampMillis(), + snapshot4.snapshotId(), + 1), // dv count Tuple.tuple( partitionRecord(partitionType, "bar", "B"), 0, @@ -451,7 +486,8 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId())); + snapshot1.snapshotId(), + 0)); } @Test @@ -482,7 +518,7 @@ public void testCopyOnWriteDelete() throws Exception { assertThat( PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(testTable)), + PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2), testTable.io().newInputFile(statisticsFile.path()))) .allMatch(s -> (s.dataRecordCount() != 0 && s.dataFileCount() != 0)); @@ -495,7 +531,7 @@ public void testCopyOnWriteDelete() throws Exception { // stats must be decremented to zero as all the files removed from table. assertThat( PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(testTable)), + PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2), testTable.io().newInputFile(statisticsFileNew.path()))) .allMatch(s -> (s.dataRecordCount() == 0 && s.dataFileCount() == 0)); } @@ -641,6 +677,47 @@ public void testFullComputeFallbackWithInvalidStats() throws Exception { assertThat(partitionStats.get(0).dataFileCount()).isEqualTo(2); } + @Test + public void testV2toV3SchemaEvolution() throws Exception { + Table testTable = + TestTables.create( + tempDir("schema_evolution"), "schema_evolution", SCHEMA, SPEC, 2, fileFormatProperty); + + // write stats file using v2 schema + DataFile dataFile = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "A")); + testTable.newAppend().appendFile(dataFile).commit(); + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.computeAndWriteStatsFile( + testTable, testTable.currentSnapshot().snapshotId()); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + + // read with v2 schema + Schema v2Schema = PartitionStatsHandler.schema(partitionSchema, 2); + List partitionStatsV2; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + v2Schema, testTable.io().newInputFile(statisticsFile.path()))) { + partitionStatsV2 = Lists.newArrayList(recordIterator); + } + + // read with v3 schema + Schema v3Schema = PartitionStatsHandler.schema(partitionSchema, 3); + List partitionStatsV3; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + v3Schema, testTable.io().newInputFile(statisticsFile.path()))) { + partitionStatsV3 = Lists.newArrayList(recordIterator); + } + + assertThat(partitionStatsV2).hasSameSizeAs(partitionStatsV3); + Comparator comparator = Comparators.forType(partitionSchema); + for (int i = 0; i < partitionStatsV2.size(); i++) { + assertThat(isEqual(comparator, partitionStatsV2.get(i), partitionStatsV3.get(i))).isTrue(); + } + } + private static StructLike partitionRecord( Types.StructType partitionType, String val1, String val2) { GenericRecord record = GenericRecord.create(partitionType); @@ -678,24 +755,11 @@ private static void computeAndValidatePartitionStats( PartitionStats::equalityDeleteFileCount, PartitionStats::totalRecords, PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId) + PartitionStats::lastUpdatedSnapshotId, + PartitionStats::dvCount) .containsExactlyInAnyOrder(expectedValues); } - private DeleteFile commitEqualityDeletes(Table testTable) { - DeleteFile eqDelete = - FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); - testTable.newRowDelta().addDeletes(eqDelete).commit(); - return eqDelete; - } - - private DeleteFile commitPositionDeletes(Table testTable) { - DeleteFile posDelete = - FileGenerationUtil.generatePositionDeleteFile(testTable, TestHelpers.Row.of("bar", "A")); - testTable.newRowDelta().addDeletes(posDelete).commit(); - return posDelete; - } - private File tempDir(String folderName) throws IOException { return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); } diff --git a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java index c94dd61bba2d..d4859a43a7fe 100644 --- a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java +++ b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java @@ -81,4 +81,11 @@ public void testFullComputeFallbackWithInvalidStats() { .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot write using unregistered internal data format: ORC"); } + + @Override + public void testV2toV3SchemaEvolution() { + assertThatThrownBy(super::testV2toV3SchemaEvolution) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } }