From f05a6ed7c3b6c7a7da3b02da3c9236380e15bd61 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 30 Jun 2025 07:10:27 +0530 Subject: [PATCH 1/6] core: Support DV for partition stats --- .../org/apache/iceberg/PartitionStats.java | 2 +- .../apache/iceberg/PartitionStatsHandler.java | 96 +++++- .../org/apache/iceberg/PartitionStatsV3.java | 86 +++++ .../PartitionStatsHandlerTestBase.java | 318 +++++++++++++----- 4 files changed, 405 insertions(+), 97 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/PartitionStatsV3.java diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index d050094f06d6..a8c60b4e17f4 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -22,7 +22,7 @@ public class PartitionStats implements StructLike { - private static final int STATS_COUNT = 12; + protected static final int STATS_COUNT = 12; private StructLike partition; private int specId; diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index 71686c456792..809cab77cadb 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -89,11 +89,16 @@ private PartitionStatsHandler() {} NestedField.optional(11, "last_updated_at", LongType.get()); public static final NestedField LAST_UPDATED_SNAPSHOT_ID = NestedField.optional(12, "last_updated_snapshot_id", LongType.get()); + public static final NestedField DV_COUNT = + NestedField.required(13, "dv_count", IntegerType.get()); /** * Generates the partition stats file schema based on a combined partition type which considers * all specs in a table. * + *

Use this only for format version 1 and 2. For version 3 and above use {@link + * #schemaV3Plus(StructType)} + * * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link * Partitioning#partitionType(Table)}. * @return a schema that corresponds to the provided unified partition type. @@ -115,6 +120,53 @@ public static Schema schema(StructType unifiedPartitionType) { LAST_UPDATED_SNAPSHOT_ID); } + /** + * Generates the partition stats file schema based on a combined partition type which considers + * all specs in a table. (For format version 3 and above) + * + * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link + * Partitioning#partitionType(Table)}. + * @return a schema that corresponds to the provided unified partition type. + */ + public static Schema schemaV3Plus(StructType unifiedPartitionType) { + Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); + return new Schema( + NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + TOTAL_DATA_FILE_SIZE_IN_BYTES, + NestedField.required( + POSITION_DELETE_RECORD_COUNT.fieldId(), + POSITION_DELETE_RECORD_COUNT.name(), + LongType.get()), + NestedField.required( + POSITION_DELETE_FILE_COUNT.fieldId(), + POSITION_DELETE_FILE_COUNT.name(), + IntegerType.get()), + NestedField.required( + EQUALITY_DELETE_RECORD_COUNT.fieldId(), + EQUALITY_DELETE_RECORD_COUNT.name(), + LongType.get()), + NestedField.required( + EQUALITY_DELETE_FILE_COUNT.fieldId(), + EQUALITY_DELETE_FILE_COUNT.name(), + IntegerType.get()), + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID, + DV_COUNT); + } + + static Schema schemaForVersion(Table table, StructType partitionType) { + int formatVersion = ((HasTableOperations) table).operations().current().formatVersion(); + if (formatVersion <= 2) { + return schema(partitionType); + } + + return schemaV3Plus(partitionType); + } + /** * Computes the stats incrementally after the snapshot that has partition stats file till the * current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after @@ -184,7 +236,7 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long List sortedStats = sortStatsByPartition(stats, partitionType); return writePartitionStatsFile( - table, snapshot.snapshotId(), schema(partitionType), sortedStats); + table, snapshot.snapshotId(), schemaForVersion(table, partitionType), sortedStats); } @VisibleForTesting @@ -226,7 +278,12 @@ public static CloseableIterable readPartitionStatsFile( CloseableIterable records = InternalData.read(fileFormat, inputFile).project(schema).build(); - return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); + + if (schema.findField(DV_COUNT.name()) == null) { + return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); + } else { + return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStatsV3); + } } private static OutputFile newPartitionStatsFile( @@ -252,6 +309,21 @@ private static PartitionStats recordToPartitionStats(StructLike record) { new PartitionStats( record.get(pos++, StructLike.class), // partition record.get(pos++, Integer.class)); // spec id + + for (; pos < record.size(); pos++) { + stats.set(pos, record.get(pos, Object.class)); + } + + return stats; + } + + private static PartitionStats recordToPartitionStatsV3(StructLike record) { + int pos = 0; + PartitionStats stats = + new PartitionStatsV3( + record.get(pos++, StructLike.class), // partition + record.get(pos++, Integer.class)); // spec id + for (; pos < record.size(); pos++) { stats.set(pos, record.get(pos, Object.class)); } @@ -263,13 +335,13 @@ private static Collection computeAndMergeStatsIncremental( Table table, Snapshot snapshot, StructType partitionType, - PartitionStatisticsFile previousStatsFile) - throws IOException { + PartitionStatisticsFile previousStatsFile) { PartitionMap statsMap = PartitionMap.create(table.specs()); // read previous stats, note that partition field will be read as GenericRecord try (CloseableIterable oldStats = readPartitionStatsFile( - schema(partitionType), table.io().newInputFile(previousStatsFile.path()))) { + schemaForVersion(table, partitionType), + table.io().newInputFile(previousStatsFile.path()))) { oldStats.forEach( partitionStats -> statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); @@ -347,6 +419,7 @@ private static PartitionMap computeStatsDiff( private static PartitionMap computeStats( Table table, List manifests, boolean incremental) { + int version = ((HasTableOperations) table).operations().current().formatVersion(); StructType partitionType = Partitioning.partitionType(table); Queue> statsByManifest = Queues.newConcurrentLinkedQueue(); Tasks.foreach(manifests) @@ -356,7 +429,7 @@ private static PartitionMap computeStats( .run( manifest -> statsByManifest.add( - collectStatsForManifest(table, manifest, partitionType, incremental))); + collectStatsForManifest(table, version, manifest, partitionType, incremental))); PartitionMap statsMap = PartitionMap.create(table.specs()); for (PartitionMap stats : statsByManifest) { @@ -367,7 +440,11 @@ private static PartitionMap computeStats( } private static PartitionMap collectStatsForManifest( - Table table, ManifestFile manifest, StructType partitionType, boolean incremental) { + Table table, + int version, + ManifestFile manifest, + StructType partitionType, + boolean incremental) { List projection = BaseScan.scanColumns(manifest.content()); try (ManifestReader reader = ManifestFiles.open(manifest, table.io()).select(projection)) { PartitionMap statsMap = PartitionMap.create(table.specs()); @@ -385,7 +462,10 @@ private static PartitionMap collectStatsForManifest( statsMap.computeIfAbsent( specId, ((PartitionData) file.partition()).copy(), - () -> new PartitionStats(key, specId)); + () -> + version > 2 + ? new PartitionStatsV3(key, specId) + : new PartitionStats(key, specId)); if (entry.isLive()) { // Live can have both added and existing entries. Consider only added entries for // incremental compute as existing entries was already included in previous compute. diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsV3.java b/core/src/main/java/org/apache/iceberg/PartitionStatsV3.java new file mode 100644 index 000000000000..ccebc3c60730 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsV3.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +public class PartitionStatsV3 extends PartitionStats { + + private int dvCount; + + public PartitionStatsV3(StructLike partition, int specId) { + super(partition, specId); + } + + public int dvCount() { + return dvCount; + } + + @Override + public void liveEntry(ContentFile file, Snapshot snapshot) { + super.liveEntry(file, snapshot); + if (file.content() == FileContent.POSITION_DELETES && file.format() == FileFormat.PUFFIN) { + this.dvCount += 1; + // revert the changes for position delete file count increment from the parent method + this.set(6, positionDeleteFileCount() - 1); + } + } + + @Override + void deletedEntryForIncrementalCompute(ContentFile file, Snapshot snapshot) { + super.deletedEntryForIncrementalCompute(file, snapshot); + if (file.content() == FileContent.POSITION_DELETES && file.format() == FileFormat.PUFFIN) { + this.dvCount -= 1; + // revert the changes for position delete file count decrement from the parent method + this.set(6, positionDeleteFileCount() + 1); + } + } + + @Override + @Deprecated // will become package-private + public void appendStats(PartitionStats entry) { + super.appendStats(entry); + + if (entry instanceof PartitionStatsV3) { + this.dvCount += ((PartitionStatsV3) entry).dvCount; + } + } + + @Override + public int size() { + // includes dv counter + return STATS_COUNT + 1; + } + + @Override + public T get(int pos, Class javaClass) { + if (pos == STATS_COUNT) { + return javaClass.cast(dvCount); + } else { + return super.get(pos, javaClass); + } + } + + @Override + public void set(int pos, T value) { + if (pos == STATS_COUNT) { + this.dvCount = value == null ? 0 : (int) value; + } else { + super.set(pos, value); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index ce20a7515d37..dc916e9c7136 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -57,14 +57,25 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; +import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.groups.Tuple; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +@ExtendWith(ParameterizedTestExtension.class) public abstract class PartitionStatsHandlerTestBase { public abstract FileFormat format(); + @Parameters(name = "formatVersion = {0}") + protected static List formatVersions() { + return Arrays.asList(2, 3); + } + + @Parameter protected int formatVersion; + private static final Schema SCHEMA = new Schema( optional(1, "c1", Types.IntegerType.get()), @@ -92,6 +103,7 @@ public abstract class PartitionStatsHandlerTestBase { private static final int TOTAL_RECORD_COUNT_POSITION = 9; private static final int LAST_UPDATED_AT_POSITION = 10; private static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11; + private static final int DV_COUNT_POSITION = 12; @Test public void testPartitionStatsOnEmptyTable() throws Exception { @@ -140,7 +152,7 @@ public void testPartitionStatsOnUnPartitionedTable() throws Exception { .hasMessage("Table must be partitioned"); } - @Test + @TestTemplate public void testAllDatatypePartitionWriting() throws Exception { Schema schema = new Schema( @@ -183,10 +195,15 @@ public void testAllDatatypePartitionWriting() throws Exception { Table testTable = TestTables.create( - tempDir("test_all_type"), "test_all_type", schema, spec, 2, fileFormatProperty); + tempDir("test_all_type_" + formatVersion), + "test_all_type_" + formatVersion, + schema, + spec, + formatVersion, + fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + Schema dataSchema = PartitionStatsHandler.schemaForVersion(testTable, partitionSchema); PartitionData partitionData = new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); @@ -207,7 +224,13 @@ public void testAllDatatypePartitionWriting() throws Exception { partitionData.set(13, new BigDecimal("12345678901234567890.1234567890")); partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value()); - PartitionStats partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + PartitionStats partitionStats; + if (formatVersion == 3) { + partitionStats = new PartitionStatsV3(partitionData, RANDOM.nextInt(10)); + } else { + partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + } + partitionStats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); partitionStats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); @@ -229,20 +252,20 @@ public void testAllDatatypePartitionWriting() throws Exception { } } - @Test + @TestTemplate public void testOptionalFieldsWriting() throws Exception { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); Table testTable = TestTables.create( - tempDir("test_partition_stats_optional"), - "test_partition_stats_optional", + tempDir("test_partition_stats_optional_" + formatVersion), + "test_partition_stats_optional_" + formatVersion, SCHEMA, spec, - 2, + formatVersion, fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schema(partitionSchema); + Schema dataSchema = PartitionStatsHandler.schemaForVersion(testTable, partitionSchema); ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); for (int i = 0; i < 5; i++) { @@ -250,7 +273,14 @@ public void testOptionalFieldsWriting() throws Exception { new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); partitionData.set(0, RANDOM.nextInt()); - PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + PartitionStats stats; + if (formatVersion == 3) { + stats = new PartitionStatsV3(partitionData, RANDOM.nextInt(10)); + stats.set(DV_COUNT_POSITION, null); + } else { + stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); + } + stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); @@ -280,6 +310,14 @@ public void testOptionalFieldsWriting() throws Exception { Arrays.asList( 0L, 0, 0L, 0, null, null, null)); // null counters must be initialized to zero. + if (formatVersion == 3) { + assertThat(expected.get(0)) + .isInstanceOf(PartitionStatsV3.class) + .asInstanceOf(InstanceOfAssertFactories.type(PartitionStatsV3.class)) + .extracting(PartitionStatsV3::dvCount) + .isEqualTo(0); // null counters must be initialized to zero. + } + PartitionStatisticsFile statisticsFile = PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); @@ -298,15 +336,15 @@ public void testOptionalFieldsWriting() throws Exception { } @SuppressWarnings("checkstyle:MethodLength") - @Test + @TestTemplate public void testPartitionStats() throws Exception { Table testTable = TestTables.create( - tempDir("partition_stats_compute"), - "partition_stats_compute", + tempDir("partition_stats_compute_" + formatVersion), + "partition_stats_compute_" + formatVersion, SCHEMA, SPEC, - 2, + formatVersion, fileFormatProperty); DataFile dataFile1 = @@ -330,7 +368,8 @@ public void testPartitionStats() throws Exception { } Snapshot snapshot1 = testTable.currentSnapshot(); - Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); + Schema recordSchema = + PartitionStatsHandler.schemaForVersion(testTable, Partitioning.partitionType(testTable)); Types.StructType partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); computeAndValidatePartitionStats( @@ -389,69 +428,136 @@ public void testPartitionStats() throws Exception { snapshot1.timestampMillis(), snapshot1.snapshotId())); - DeleteFile posDeletes = commitPositionDeletes(testTable); + DeleteFile posDeletes = null; + DeleteFile deleteVectors = null; + if (formatVersion == 3) { + deleteVectors = commitDVs(testTable, dataFile3); + } else if (formatVersion == 2) { + posDeletes = commitPositionDeletes(testTable); + } + Snapshot snapshot2 = testTable.currentSnapshot(); DeleteFile eqDeletes = commitEqualityDeletes(testTable); Snapshot snapshot3 = testTable.currentSnapshot(); - recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable)); - partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); - computeAndValidatePartitionStats( - testTable, - recordSchema, - Tuple.tuple( - partitionRecord(partitionType, "foo", "A"), - 0, - 3 * dataFile1.recordCount(), - 3, - 3 * dataFile1.fileSizeInBytes(), - 0L, - 0, - eqDeletes.recordCount(), - 1, - null, - snapshot3.timestampMillis(), - snapshot3.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "foo", "B"), - 0, - 3 * dataFile2.recordCount(), - 3, - 3 * dataFile2.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "bar", "A"), - 0, - 3 * dataFile3.recordCount(), - 3, - 3 * dataFile3.fileSizeInBytes(), - posDeletes.recordCount(), - 1, - 0L, - 0, - null, - snapshot2.timestampMillis(), - snapshot2.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "bar", "B"), - 0, - 3 * dataFile4.recordCount(), - 3, - 3 * dataFile4.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId())); + if (formatVersion == 3) { + computeAndValidatePartitionStatsV3( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 3 * dataFile1.recordCount(), + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + eqDeletes.recordCount(), + 1, + null, + snapshot3.timestampMillis(), + snapshot3.snapshotId(), + 0), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3 * dataFile2.recordCount(), + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + 0), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3 * dataFile3.recordCount(), + 3, + 3 * dataFile3.fileSizeInBytes(), + deleteVectors.recordCount(), // dv record count as position delete record count + 0, // no position delete files + 0L, + 0, + null, + snapshot2.timestampMillis(), + snapshot2.snapshotId(), + 1), // dv file count + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 3 * dataFile4.recordCount(), + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + 0)); + } else { + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 3 * dataFile1.recordCount(), + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + eqDeletes.recordCount(), + 1, + null, + snapshot3.timestampMillis(), + snapshot3.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3 * dataFile2.recordCount(), + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3 * dataFile3.recordCount(), + 3, + 3 * dataFile3.fileSizeInBytes(), + posDeletes.recordCount(), + 1, + 0L, + 0, + null, + snapshot2.timestampMillis(), + snapshot2.snapshotId()), + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 3 * dataFile4.recordCount(), + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId())); + } } @Test @@ -482,7 +588,8 @@ public void testCopyOnWriteDelete() throws Exception { assertThat( PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(testTable)), + PartitionStatsHandler.schemaForVersion( + testTable, Partitioning.partitionType(testTable)), testTable.io().newInputFile(statisticsFile.path()))) .allMatch(s -> (s.dataRecordCount() != 0 && s.dataFileCount() != 0)); @@ -495,7 +602,8 @@ public void testCopyOnWriteDelete() throws Exception { // stats must be decremented to zero as all the files removed from table. assertThat( PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(testTable)), + PartitionStatsHandler.schemaForVersion( + testTable, Partitioning.partitionType(testTable)), testTable.io().newInputFile(statisticsFileNew.path()))) .allMatch(s -> (s.dataRecordCount() == 0 && s.dataFileCount() == 0)); } @@ -652,19 +760,7 @@ private static StructLike partitionRecord( private static void computeAndValidatePartitionStats( Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { // compute and commit partition stats file - Snapshot currentSnapshot = testTable.currentSnapshot(); - PartitionStatisticsFile result = PartitionStatsHandler.computeAndWriteStatsFile(testTable); - testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); - assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); - - // read the partition entries from the stats file - List partitionStats; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - recordSchema, testTable.io().newInputFile(result.path()))) { - partitionStats = Lists.newArrayList(recordIterator); - } - + List partitionStats = computeAndReadStats(testTable, recordSchema); assertThat(partitionStats) .extracting( PartitionStats::partition, @@ -682,6 +778,46 @@ private static void computeAndValidatePartitionStats( .containsExactlyInAnyOrder(expectedValues); } + private static void computeAndValidatePartitionStatsV3( + Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { + List partitionStats = computeAndReadStats(testTable, recordSchema); + assertThat(partitionStats) + .extracting( + PartitionStats::partition, + PartitionStats::specId, + PartitionStats::dataRecordCount, + PartitionStats::dataFileCount, + PartitionStats::totalDataFileSizeInBytes, + PartitionStats::positionDeleteRecordCount, + PartitionStats::positionDeleteFileCount, + PartitionStats::equalityDeleteRecordCount, + PartitionStats::equalityDeleteFileCount, + PartitionStats::totalRecords, + PartitionStats::lastUpdatedAt, + PartitionStats::lastUpdatedSnapshotId, + stat -> ((PartitionStatsV3) stat).dvCount()) + .containsExactlyInAnyOrder(expectedValues); + } + + private static List computeAndReadStats(Table testTable, Schema recordSchema) + throws IOException { + // compute and commit partition stats file + Snapshot currentSnapshot = testTable.currentSnapshot(); + PartitionStatisticsFile result = PartitionStatsHandler.computeAndWriteStatsFile(testTable); + testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); + assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); + + // read the partition entries from the stats file + List partitionStats; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + recordSchema, testTable.io().newInputFile(result.path()))) { + partitionStats = Lists.newArrayList(recordIterator); + } + + return partitionStats; + } + private DeleteFile commitEqualityDeletes(Table testTable) { DeleteFile eqDelete = FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); @@ -696,6 +832,12 @@ private DeleteFile commitPositionDeletes(Table testTable) { return posDelete; } + private DeleteFile commitDVs(Table testTable, DataFile dataFile) { + DeleteFile posDeleteVector = FileGenerationUtil.generateDV(testTable, dataFile); + testTable.newRowDelta().addDeletes(posDeleteVector).commit(); + return posDeleteVector; + } + private File tempDir(String folderName) throws IOException { return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); } From 465cb22530e2aa5f688878f3131362cd2557448e Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 30 Jun 2025 18:28:44 +0530 Subject: [PATCH 2/6] Address comments --- .../org/apache/iceberg/PartitionStats.java | 2 +- .../apache/iceberg/PartitionStatsHandler.java | 60 ++++++++----------- .../PartitionStatsHandlerTestBase.java | 13 ++-- 3 files changed, 33 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index a8c60b4e17f4..dd795e33ea9b 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -22,7 +22,7 @@ public class PartitionStats implements StructLike { - protected static final int STATS_COUNT = 12; + static final int STATS_COUNT = 12; private StructLike partition; private int specId; diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index 809cab77cadb..aa1507565abf 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.data.GenericRecord; @@ -97,12 +98,15 @@ private PartitionStatsHandler() {} * all specs in a table. * *

Use this only for format version 1 and 2. For version 3 and above use {@link - * #schemaV3Plus(StructType)} + * #schema(StructType, int)} * * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link * Partitioning#partitionType(Table)}. * @return a schema that corresponds to the provided unified partition type. + * @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link #schema(StructType, int)} + * instead. */ + @Deprecated public static Schema schema(StructType unifiedPartitionType) { Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); return new Schema( @@ -121,15 +125,20 @@ public static Schema schema(StructType unifiedPartitionType) { } /** - * Generates the partition stats file schema based on a combined partition type which considers - * all specs in a table. (For format version 3 and above) + * Generates the partition stats file schema for a given format version based on a combined + * partition type which considers all specs in a table. * * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link * Partitioning#partitionType(Table)}. * @return a schema that corresponds to the provided unified partition type. */ - public static Schema schemaV3Plus(StructType unifiedPartitionType) { + public static Schema schema(StructType unifiedPartitionType, int formatVersion) { Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); + + if (formatVersion <= 2) { + return schema(unifiedPartitionType); + } + return new Schema( NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), SPEC_ID, @@ -158,15 +167,6 @@ public static Schema schemaV3Plus(StructType unifiedPartitionType) { DV_COUNT); } - static Schema schemaForVersion(Table table, StructType partitionType) { - int formatVersion = ((HasTableOperations) table).operations().current().formatVersion(); - if (formatVersion <= 2) { - return schema(partitionType); - } - - return schemaV3Plus(partitionType); - } - /** * Computes the stats incrementally after the snapshot that has partition stats file till the * current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after @@ -236,7 +236,10 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long List sortedStats = sortStatsByPartition(stats, partitionType); return writePartitionStatsFile( - table, snapshot.snapshotId(), schemaForVersion(table, partitionType), sortedStats); + table, + snapshot.snapshotId(), + schema(partitionType, TableUtil.formatVersion(table)), + sortedStats); } @VisibleForTesting @@ -280,9 +283,11 @@ public static CloseableIterable readPartitionStatsFile( InternalData.read(fileFormat, inputFile).project(schema).build(); if (schema.findField(DV_COUNT.name()) == null) { - return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); + return CloseableIterable.transform( + records, record -> recordToPartitionStats(record, PartitionStats::new)); } else { - return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStatsV3); + return CloseableIterable.transform( + records, record -> recordToPartitionStats(record, PartitionStatsV3::new)); } } @@ -303,24 +308,11 @@ private static OutputFile newPartitionStatsFile( Locale.ROOT, "partition-stats-%d-%s", snapshotId, UUID.randomUUID())))); } - private static PartitionStats recordToPartitionStats(StructLike record) { - int pos = 0; - PartitionStats stats = - new PartitionStats( - record.get(pos++, StructLike.class), // partition - record.get(pos++, Integer.class)); // spec id - - for (; pos < record.size(); pos++) { - stats.set(pos, record.get(pos, Object.class)); - } - - return stats; - } - - private static PartitionStats recordToPartitionStatsV3(StructLike record) { + private static PartitionStats recordToPartitionStats( + StructLike record, BiFunction statBuilder) { int pos = 0; PartitionStats stats = - new PartitionStatsV3( + statBuilder.apply( record.get(pos++, StructLike.class), // partition record.get(pos++, Integer.class)); // spec id @@ -340,7 +332,7 @@ private static Collection computeAndMergeStatsIncremental( // read previous stats, note that partition field will be read as GenericRecord try (CloseableIterable oldStats = readPartitionStatsFile( - schemaForVersion(table, partitionType), + schema(partitionType, TableUtil.formatVersion(table)), table.io().newInputFile(previousStatsFile.path()))) { oldStats.forEach( partitionStats -> @@ -419,7 +411,7 @@ private static PartitionMap computeStatsDiff( private static PartitionMap computeStats( Table table, List manifests, boolean incremental) { - int version = ((HasTableOperations) table).operations().current().formatVersion(); + int version = TableUtil.formatVersion(table); StructType partitionType = Partitioning.partitionType(table); Queue> statsByManifest = Queues.newConcurrentLinkedQueue(); Tasks.foreach(manifests) diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index dc916e9c7136..1e0ae7e66416 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -203,7 +203,7 @@ public void testAllDatatypePartitionWriting() throws Exception { fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schemaForVersion(testTable, partitionSchema); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion); PartitionData partitionData = new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); @@ -265,7 +265,7 @@ public void testOptionalFieldsWriting() throws Exception { fileFormatProperty); Types.StructType partitionSchema = Partitioning.partitionType(testTable); - Schema dataSchema = PartitionStatsHandler.schemaForVersion(testTable, partitionSchema); + Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion); ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); for (int i = 0; i < 5; i++) { @@ -369,7 +369,8 @@ public void testPartitionStats() throws Exception { Snapshot snapshot1 = testTable.currentSnapshot(); Schema recordSchema = - PartitionStatsHandler.schemaForVersion(testTable, Partitioning.partitionType(testTable)); + PartitionStatsHandler.schema(Partitioning.partitionType(testTable), formatVersion); + Types.StructType partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); computeAndValidatePartitionStats( @@ -588,8 +589,7 @@ public void testCopyOnWriteDelete() throws Exception { assertThat( PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schemaForVersion( - testTable, Partitioning.partitionType(testTable)), + PartitionStatsHandler.schema(Partitioning.partitionType(testTable), formatVersion), testTable.io().newInputFile(statisticsFile.path()))) .allMatch(s -> (s.dataRecordCount() != 0 && s.dataFileCount() != 0)); @@ -602,8 +602,7 @@ public void testCopyOnWriteDelete() throws Exception { // stats must be decremented to zero as all the files removed from table. assertThat( PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schemaForVersion( - testTable, Partitioning.partitionType(testTable)), + PartitionStatsHandler.schema(Partitioning.partitionType(testTable), formatVersion), testTable.io().newInputFile(statisticsFileNew.path()))) .allMatch(s -> (s.dataRecordCount() == 0 && s.dataFileCount() == 0)); } From 271a970aa0db98c7b93b07b6c699c2763ce27ffa Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Wed, 2 Jul 2025 18:05:20 +0530 Subject: [PATCH 3/6] Remove PartitionStatsV3 --- .../org/apache/iceberg/PartitionStats.java | 30 +- .../apache/iceberg/PartitionStatsHandler.java | 30 +- .../org/apache/iceberg/PartitionStatsV3.java | 86 ------ .../PartitionStatsHandlerTestBase.java | 273 ++++++------------ 4 files changed, 118 insertions(+), 301 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/PartitionStatsV3.java diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index dd795e33ea9b..4423b4f6dc24 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -22,20 +22,21 @@ public class PartitionStats implements StructLike { - static final int STATS_COUNT = 12; + static final int STATS_COUNT = 13; private StructLike partition; private int specId; private long dataRecordCount; private int dataFileCount; private long totalDataFileSizeInBytes; - private long positionDeleteRecordCount; + private long positionDeleteRecordCount; // also includes dv record count as per spec private int positionDeleteFileCount; private long equalityDeleteRecordCount; private int equalityDeleteFileCount; private Long totalRecordCount; // null by default private Long lastUpdatedAt; // null by default private Long lastUpdatedSnapshotId; // null by default + private int dvCount; public PartitionStats(StructLike partition, int specId) { this.partition = partition; @@ -90,6 +91,10 @@ public Long lastUpdatedSnapshotId() { return lastUpdatedSnapshotId; } + public int dvCount() { + return dvCount; + } + /** * Updates the partition stats from the data/delete file. * @@ -109,7 +114,12 @@ public void liveEntry(ContentFile file, Snapshot snapshot) { break; case POSITION_DELETES: this.positionDeleteRecordCount += file.recordCount(); - this.positionDeleteFileCount += 1; + if (file.format() == FileFormat.PUFFIN) { + this.dvCount += 1; + } else { + this.positionDeleteFileCount += 1; + } + break; case EQUALITY_DELETES: this.equalityDeleteRecordCount += file.recordCount(); @@ -156,7 +166,12 @@ void deletedEntryForIncrementalCompute(ContentFile file, Snapshot snapshot) { break; case POSITION_DELETES: this.positionDeleteRecordCount -= file.recordCount(); - this.positionDeleteFileCount -= 1; + if (file.format() == FileFormat.PUFFIN) { + this.dvCount -= 1; + } else { + this.positionDeleteFileCount -= 1; + } + break; case EQUALITY_DELETES: this.equalityDeleteRecordCount -= file.recordCount(); @@ -200,6 +215,8 @@ public void appendStats(PartitionStats entry) { if (entry.lastUpdatedAt != null) { updateSnapshotInfo(entry.lastUpdatedSnapshotId, entry.lastUpdatedAt); } + + this.dvCount += entry.dvCount; } private void updateSnapshotInfo(long snapshotId, long updatedAt) { @@ -241,6 +258,8 @@ public T get(int pos, Class javaClass) { return javaClass.cast(lastUpdatedAt); case 11: return javaClass.cast(lastUpdatedSnapshotId); + case 12: + return javaClass.cast(dvCount); default: throw new UnsupportedOperationException("Unknown position: " + pos); } @@ -289,6 +308,9 @@ public void set(int pos, T value) { case 11: this.lastUpdatedSnapshotId = (Long) value; break; + case 12: + this.dvCount = value == null ? 0 : (int) value; + break; default: throw new UnsupportedOperationException("Unknown position: " + pos); } diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index aa1507565abf..eb9039f0469c 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Queue; import java.util.UUID; -import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.data.GenericRecord; @@ -281,14 +280,7 @@ public static CloseableIterable readPartitionStatsFile( CloseableIterable records = InternalData.read(fileFormat, inputFile).project(schema).build(); - - if (schema.findField(DV_COUNT.name()) == null) { - return CloseableIterable.transform( - records, record -> recordToPartitionStats(record, PartitionStats::new)); - } else { - return CloseableIterable.transform( - records, record -> recordToPartitionStats(record, PartitionStatsV3::new)); - } + return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); } private static OutputFile newPartitionStatsFile( @@ -308,14 +300,12 @@ private static OutputFile newPartitionStatsFile( Locale.ROOT, "partition-stats-%d-%s", snapshotId, UUID.randomUUID())))); } - private static PartitionStats recordToPartitionStats( - StructLike record, BiFunction statBuilder) { + private static PartitionStats recordToPartitionStats(StructLike record) { int pos = 0; PartitionStats stats = - statBuilder.apply( + new PartitionStats( record.get(pos++, StructLike.class), // partition record.get(pos++, Integer.class)); // spec id - for (; pos < record.size(); pos++) { stats.set(pos, record.get(pos, Object.class)); } @@ -411,7 +401,6 @@ private static PartitionMap computeStatsDiff( private static PartitionMap computeStats( Table table, List manifests, boolean incremental) { - int version = TableUtil.formatVersion(table); StructType partitionType = Partitioning.partitionType(table); Queue> statsByManifest = Queues.newConcurrentLinkedQueue(); Tasks.foreach(manifests) @@ -421,7 +410,7 @@ private static PartitionMap computeStats( .run( manifest -> statsByManifest.add( - collectStatsForManifest(table, version, manifest, partitionType, incremental))); + collectStatsForManifest(table, manifest, partitionType, incremental))); PartitionMap statsMap = PartitionMap.create(table.specs()); for (PartitionMap stats : statsByManifest) { @@ -432,11 +421,7 @@ private static PartitionMap computeStats( } private static PartitionMap collectStatsForManifest( - Table table, - int version, - ManifestFile manifest, - StructType partitionType, - boolean incremental) { + Table table, ManifestFile manifest, StructType partitionType, boolean incremental) { List projection = BaseScan.scanColumns(manifest.content()); try (ManifestReader reader = ManifestFiles.open(manifest, table.io()).select(projection)) { PartitionMap statsMap = PartitionMap.create(table.specs()); @@ -454,10 +439,7 @@ private static PartitionMap collectStatsForManifest( statsMap.computeIfAbsent( specId, ((PartitionData) file.partition()).copy(), - () -> - version > 2 - ? new PartitionStatsV3(key, specId) - : new PartitionStats(key, specId)); + () -> new PartitionStats(key, specId)); if (entry.isLive()) { // Live can have both added and existing entries. Consider only added entries for // incremental compute as existing entries was already included in previous compute. diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsV3.java b/core/src/main/java/org/apache/iceberg/PartitionStatsV3.java deleted file mode 100644 index ccebc3c60730..000000000000 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsV3.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg; - -public class PartitionStatsV3 extends PartitionStats { - - private int dvCount; - - public PartitionStatsV3(StructLike partition, int specId) { - super(partition, specId); - } - - public int dvCount() { - return dvCount; - } - - @Override - public void liveEntry(ContentFile file, Snapshot snapshot) { - super.liveEntry(file, snapshot); - if (file.content() == FileContent.POSITION_DELETES && file.format() == FileFormat.PUFFIN) { - this.dvCount += 1; - // revert the changes for position delete file count increment from the parent method - this.set(6, positionDeleteFileCount() - 1); - } - } - - @Override - void deletedEntryForIncrementalCompute(ContentFile file, Snapshot snapshot) { - super.deletedEntryForIncrementalCompute(file, snapshot); - if (file.content() == FileContent.POSITION_DELETES && file.format() == FileFormat.PUFFIN) { - this.dvCount -= 1; - // revert the changes for position delete file count decrement from the parent method - this.set(6, positionDeleteFileCount() + 1); - } - } - - @Override - @Deprecated // will become package-private - public void appendStats(PartitionStats entry) { - super.appendStats(entry); - - if (entry instanceof PartitionStatsV3) { - this.dvCount += ((PartitionStatsV3) entry).dvCount; - } - } - - @Override - public int size() { - // includes dv counter - return STATS_COUNT + 1; - } - - @Override - public T get(int pos, Class javaClass) { - if (pos == STATS_COUNT) { - return javaClass.cast(dvCount); - } else { - return super.get(pos, javaClass); - } - } - - @Override - public void set(int pos, T value) { - if (pos == STATS_COUNT) { - this.dvCount = value == null ? 0 : (int) value; - } else { - super.set(pos, value); - } - } -} diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index 1e0ae7e66416..08dd3916e7c8 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -57,7 +57,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; -import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.groups.Tuple; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; @@ -224,13 +223,7 @@ public void testAllDatatypePartitionWriting() throws Exception { partitionData.set(13, new BigDecimal("12345678901234567890.1234567890")); partitionData.set(14, Literal.of("10:10:10").to(Types.TimeType.get()).value()); - PartitionStats partitionStats; - if (formatVersion == 3) { - partitionStats = new PartitionStatsV3(partitionData, RANDOM.nextInt(10)); - } else { - partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - } - + PartitionStats partitionStats = new PartitionStats(partitionData, RANDOM.nextInt(10)); partitionStats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); partitionStats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); @@ -273,14 +266,7 @@ public void testOptionalFieldsWriting() throws Exception { new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); partitionData.set(0, RANDOM.nextInt()); - PartitionStats stats; - if (formatVersion == 3) { - stats = new PartitionStatsV3(partitionData, RANDOM.nextInt(10)); - stats.set(DV_COUNT_POSITION, null); - } else { - stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); - } - + PartitionStats stats = new PartitionStats(partitionData, RANDOM.nextInt(10)); stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); @@ -291,6 +277,7 @@ public void testOptionalFieldsWriting() throws Exception { stats.set(TOTAL_RECORD_COUNT_POSITION, null); stats.set(LAST_UPDATED_AT_POSITION, null); stats.set(LAST_UPDATED_SNAPSHOT_ID_POSITION, null); + stats.set(DV_COUNT_POSITION, null); partitionListBuilder.add(stats); } @@ -305,18 +292,11 @@ public void testOptionalFieldsWriting() throws Exception { PartitionStats::equalityDeleteFileCount, PartitionStats::totalRecords, PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId) + PartitionStats::lastUpdatedSnapshotId, + PartitionStats::dvCount) .isEqualTo( Arrays.asList( - 0L, 0, 0L, 0, null, null, null)); // null counters must be initialized to zero. - - if (formatVersion == 3) { - assertThat(expected.get(0)) - .isInstanceOf(PartitionStatsV3.class) - .asInstanceOf(InstanceOfAssertFactories.type(PartitionStatsV3.class)) - .extracting(PartitionStatsV3::dvCount) - .isEqualTo(0); // null counters must be initialized to zero. - } + 0L, 0, 0L, 0, null, null, null, 0)); // null counters must be initialized to zero. PartitionStatisticsFile statisticsFile = PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); @@ -388,7 +368,8 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId()), + snapshot1.snapshotId(), + 0), Tuple.tuple( partitionRecord(partitionType, "foo", "B"), 0, @@ -401,7 +382,8 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId()), + snapshot1.snapshotId(), + 0), Tuple.tuple( partitionRecord(partitionType, "bar", "A"), 0, @@ -414,7 +396,8 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId()), + snapshot1.snapshotId(), + 0), Tuple.tuple( partitionRecord(partitionType, "bar", "B"), 0, @@ -427,7 +410,8 @@ public void testPartitionStats() throws Exception { 0, null, snapshot1.timestampMillis(), - snapshot1.snapshotId())); + snapshot1.snapshotId(), + 0)); DeleteFile posDeletes = null; DeleteFile deleteVectors = null; @@ -442,123 +426,65 @@ public void testPartitionStats() throws Exception { DeleteFile eqDeletes = commitEqualityDeletes(testTable); Snapshot snapshot3 = testTable.currentSnapshot(); - if (formatVersion == 3) { - computeAndValidatePartitionStatsV3( - testTable, - recordSchema, - Tuple.tuple( - partitionRecord(partitionType, "foo", "A"), - 0, - 3 * dataFile1.recordCount(), - 3, - 3 * dataFile1.fileSizeInBytes(), - 0L, - 0, - eqDeletes.recordCount(), - 1, - null, - snapshot3.timestampMillis(), - snapshot3.snapshotId(), - 0), - Tuple.tuple( - partitionRecord(partitionType, "foo", "B"), - 0, - 3 * dataFile2.recordCount(), - 3, - 3 * dataFile2.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId(), - 0), - Tuple.tuple( - partitionRecord(partitionType, "bar", "A"), - 0, - 3 * dataFile3.recordCount(), - 3, - 3 * dataFile3.fileSizeInBytes(), - deleteVectors.recordCount(), // dv record count as position delete record count - 0, // no position delete files - 0L, - 0, - null, - snapshot2.timestampMillis(), - snapshot2.snapshotId(), - 1), // dv file count - Tuple.tuple( - partitionRecord(partitionType, "bar", "B"), - 0, - 3 * dataFile4.recordCount(), - 3, - 3 * dataFile4.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId(), - 0)); - } else { - computeAndValidatePartitionStats( - testTable, - recordSchema, - Tuple.tuple( - partitionRecord(partitionType, "foo", "A"), - 0, - 3 * dataFile1.recordCount(), - 3, - 3 * dataFile1.fileSizeInBytes(), - 0L, - 0, - eqDeletes.recordCount(), - 1, - null, - snapshot3.timestampMillis(), - snapshot3.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "foo", "B"), - 0, - 3 * dataFile2.recordCount(), - 3, - 3 * dataFile2.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "bar", "A"), - 0, - 3 * dataFile3.recordCount(), - 3, - 3 * dataFile3.fileSizeInBytes(), - posDeletes.recordCount(), - 1, - 0L, - 0, - null, - snapshot2.timestampMillis(), - snapshot2.snapshotId()), - Tuple.tuple( - partitionRecord(partitionType, "bar", "B"), - 0, - 3 * dataFile4.recordCount(), - 3, - 3 * dataFile4.fileSizeInBytes(), - 0L, - 0, - 0L, - 0, - null, - snapshot1.timestampMillis(), - snapshot1.snapshotId())); - } + computeAndValidatePartitionStats( + testTable, + recordSchema, + Tuple.tuple( + partitionRecord(partitionType, "foo", "A"), + 0, + 3 * dataFile1.recordCount(), + 3, + 3 * dataFile1.fileSizeInBytes(), + 0L, + 0, + eqDeletes.recordCount(), + 1, + null, + snapshot3.timestampMillis(), + snapshot3.snapshotId(), + 0), + Tuple.tuple( + partitionRecord(partitionType, "foo", "B"), + 0, + 3 * dataFile2.recordCount(), + 3, + 3 * dataFile2.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + 0), + Tuple.tuple( + partitionRecord(partitionType, "bar", "A"), + 0, + 3 * dataFile3.recordCount(), + 3, + 3 * dataFile3.fileSizeInBytes(), + formatVersion == 3 ? deleteVectors.recordCount() : posDeletes.recordCount(), + formatVersion == 3 ? 0 : 1, // pos delete file count + 0L, + 0, + null, + snapshot2.timestampMillis(), + snapshot2.snapshotId(), + formatVersion == 3 ? 1 : 0), // dv count + Tuple.tuple( + partitionRecord(partitionType, "bar", "B"), + 0, + 3 * dataFile4.recordCount(), + 3, + 3 * dataFile4.fileSizeInBytes(), + 0L, + 0, + 0L, + 0, + null, + snapshot1.timestampMillis(), + snapshot1.snapshotId(), + 0)); } @Test @@ -759,27 +685,19 @@ private static StructLike partitionRecord( private static void computeAndValidatePartitionStats( Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { // compute and commit partition stats file - List partitionStats = computeAndReadStats(testTable, recordSchema); - assertThat(partitionStats) - .extracting( - PartitionStats::partition, - PartitionStats::specId, - PartitionStats::dataRecordCount, - PartitionStats::dataFileCount, - PartitionStats::totalDataFileSizeInBytes, - PartitionStats::positionDeleteRecordCount, - PartitionStats::positionDeleteFileCount, - PartitionStats::equalityDeleteRecordCount, - PartitionStats::equalityDeleteFileCount, - PartitionStats::totalRecords, - PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId) - .containsExactlyInAnyOrder(expectedValues); - } + Snapshot currentSnapshot = testTable.currentSnapshot(); + PartitionStatisticsFile result = PartitionStatsHandler.computeAndWriteStatsFile(testTable); + testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); + assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); + + // read the partition entries from the stats file + List partitionStats; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + recordSchema, testTable.io().newInputFile(result.path()))) { + partitionStats = Lists.newArrayList(recordIterator); + } - private static void computeAndValidatePartitionStatsV3( - Table testTable, Schema recordSchema, Tuple... expectedValues) throws IOException { - List partitionStats = computeAndReadStats(testTable, recordSchema); assertThat(partitionStats) .extracting( PartitionStats::partition, @@ -794,29 +712,10 @@ private static void computeAndValidatePartitionStatsV3( PartitionStats::totalRecords, PartitionStats::lastUpdatedAt, PartitionStats::lastUpdatedSnapshotId, - stat -> ((PartitionStatsV3) stat).dvCount()) + PartitionStats::dvCount) .containsExactlyInAnyOrder(expectedValues); } - private static List computeAndReadStats(Table testTable, Schema recordSchema) - throws IOException { - // compute and commit partition stats file - Snapshot currentSnapshot = testTable.currentSnapshot(); - PartitionStatisticsFile result = PartitionStatsHandler.computeAndWriteStatsFile(testTable); - testTable.updatePartitionStatistics().setPartitionStatistics(result).commit(); - assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); - - // read the partition entries from the stats file - List partitionStats; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - recordSchema, testTable.io().newInputFile(result.path()))) { - partitionStats = Lists.newArrayList(recordIterator); - } - - return partitionStats; - } - private DeleteFile commitEqualityDeletes(Table testTable) { DeleteFile eqDelete = FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); From 9978fd5ab3a7f7cf7ca0a88feda5c9eded3af280 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Thu, 3 Jul 2025 12:57:34 +0530 Subject: [PATCH 4/6] Address new comments --- .../apache/iceberg/PartitionStatsHandler.java | 40 ++++++++++++------- .../PartitionStatsHandlerTestBase.java | 16 ++++---- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index eb9039f0469c..1a4c568742ce 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -108,19 +108,7 @@ private PartitionStatsHandler() {} @Deprecated public static Schema schema(StructType unifiedPartitionType) { Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); - return new Schema( - NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), - SPEC_ID, - DATA_RECORD_COUNT, - DATA_FILE_COUNT, - TOTAL_DATA_FILE_SIZE_IN_BYTES, - POSITION_DELETE_RECORD_COUNT, - POSITION_DELETE_FILE_COUNT, - EQUALITY_DELETE_RECORD_COUNT, - EQUALITY_DELETE_FILE_COUNT, - TOTAL_RECORD_COUNT, - LAST_UPDATED_AT, - LAST_UPDATED_SNAPSHOT_ID); + return v2Schema(unifiedPartitionType); } /** @@ -133,11 +121,35 @@ public static Schema schema(StructType unifiedPartitionType) { */ public static Schema schema(StructType unifiedPartitionType, int formatVersion) { Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); + Preconditions.checkState( + formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION, + "Invalid format version: %d", + formatVersion); if (formatVersion <= 2) { - return schema(unifiedPartitionType); + return v2Schema(unifiedPartitionType); } + return v3Schema(unifiedPartitionType); + } + + private static Schema v2Schema(StructType unifiedPartitionType) { + return new Schema( + NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + TOTAL_DATA_FILE_SIZE_IN_BYTES, + POSITION_DELETE_RECORD_COUNT, + POSITION_DELETE_FILE_COUNT, + EQUALITY_DELETE_RECORD_COUNT, + EQUALITY_DELETE_FILE_COUNT, + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID); + } + + private static Schema v3Schema(StructType unifiedPartitionType) { return new Schema( NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), SPEC_ID, diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index 08dd3916e7c8..e13f28b58a75 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ContentFileUtil; import org.assertj.core.groups.Tuple; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; @@ -414,9 +415,8 @@ public void testPartitionStats() throws Exception { 0)); DeleteFile posDeletes = null; - DeleteFile deleteVectors = null; - if (formatVersion == 3) { - deleteVectors = commitDVs(testTable, dataFile3); + if (formatVersion >= 3) { + posDeletes = commitDVs(testTable, dataFile3); } else if (formatVersion == 2) { posDeletes = commitPositionDeletes(testTable); } @@ -463,14 +463,14 @@ public void testPartitionStats() throws Exception { 3 * dataFile3.recordCount(), 3, 3 * dataFile3.fileSizeInBytes(), - formatVersion == 3 ? deleteVectors.recordCount() : posDeletes.recordCount(), - formatVersion == 3 ? 0 : 1, // pos delete file count + posDeletes.recordCount(), + ContentFileUtil.isDV(posDeletes) ? 0 : 1, // pos delete file count 0L, 0, null, snapshot2.timestampMillis(), snapshot2.snapshotId(), - formatVersion == 3 ? 1 : 0), // dv count + ContentFileUtil.isDV(posDeletes) ? 1 : 0), // dv count Tuple.tuple( partitionRecord(partitionType, "bar", "B"), 0, @@ -515,7 +515,7 @@ public void testCopyOnWriteDelete() throws Exception { assertThat( PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(testTable), formatVersion), + PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2), testTable.io().newInputFile(statisticsFile.path()))) .allMatch(s -> (s.dataRecordCount() != 0 && s.dataFileCount() != 0)); @@ -528,7 +528,7 @@ public void testCopyOnWriteDelete() throws Exception { // stats must be decremented to zero as all the files removed from table. assertThat( PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(testTable), formatVersion), + PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2), testTable.io().newInputFile(statisticsFileNew.path()))) .allMatch(s -> (s.dataRecordCount() == 0 && s.dataFileCount() == 0)); } From f9ca9a66c2039ba014431bf4907c4a79754fee56 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 7 Jul 2025 09:19:29 +0530 Subject: [PATCH 5/6] Handle V3 reading V2 stats --- .../org/apache/iceberg/PartitionStats.java | 5 +- .../apache/iceberg/PartitionStatsHandler.java | 10 ++- .../PartitionStatsHandlerTestBase.java | 87 ++++++++++++++----- .../iceberg/TestOrcPartitionStatsHandler.java | 7 ++ 4 files changed, 81 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index 4423b4f6dc24..e35551649555 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -22,7 +22,7 @@ public class PartitionStats implements StructLike { - static final int STATS_COUNT = 13; + private static final int STATS_COUNT = 13; private StructLike partition; private int specId; @@ -203,6 +203,7 @@ public void appendStats(PartitionStats entry) { this.positionDeleteFileCount += entry.positionDeleteFileCount; this.equalityDeleteRecordCount += entry.equalityDeleteRecordCount; this.equalityDeleteFileCount += entry.equalityDeleteFileCount; + this.dvCount += entry.dvCount; if (entry.totalRecordCount != null) { if (totalRecordCount == null) { @@ -215,8 +216,6 @@ public void appendStats(PartitionStats entry) { if (entry.lastUpdatedAt != null) { updateSnapshotInfo(entry.lastUpdatedSnapshotId, entry.lastUpdatedAt); } - - this.dvCount += entry.dvCount; } private void updateSnapshotInfo(long snapshotId, long updatedAt) { diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index 1a4c568742ce..343d5636be28 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; @@ -42,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Queues; import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.NestedField; @@ -89,8 +91,14 @@ private PartitionStatsHandler() {} NestedField.optional(11, "last_updated_at", LongType.get()); public static final NestedField LAST_UPDATED_SNAPSHOT_ID = NestedField.optional(12, "last_updated_snapshot_id", LongType.get()); + // Using default value for v3 field to support v3 reader reading file written by v2 public static final NestedField DV_COUNT = - NestedField.required(13, "dv_count", IntegerType.get()); + NestedField.required("dv_count") + .withId(13) + .ofType(Types.IntegerType.get()) + .withInitialDefault(Literal.of(0)) + .withWriteDefault(Literal.of(0)) + .build(); /** * Generates the partition stats file schema based on a combined partition type which considers diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index e13f28b58a75..db1ac087e52e 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -57,7 +57,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ContentFileUtil; import org.assertj.core.groups.Tuple; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestTemplate; @@ -71,7 +70,7 @@ public abstract class PartitionStatsHandlerTestBase { @Parameters(name = "formatVersion = {0}") protected static List formatVersions() { - return Arrays.asList(2, 3); + return TestHelpers.V2_AND_ABOVE; } @Parameter protected int formatVersion; @@ -317,15 +316,15 @@ public void testOptionalFieldsWriting() throws Exception { } @SuppressWarnings("checkstyle:MethodLength") - @TestTemplate + @Test public void testPartitionStats() throws Exception { Table testTable = TestTables.create( - tempDir("partition_stats_compute_" + formatVersion), - "partition_stats_compute_" + formatVersion, + tempDir("partition_stats_compute"), + "partition_stats_compute", SCHEMA, SPEC, - formatVersion, + 2, fileFormatProperty); DataFile dataFile1 = @@ -349,8 +348,7 @@ public void testPartitionStats() throws Exception { } Snapshot snapshot1 = testTable.currentSnapshot(); - Schema recordSchema = - PartitionStatsHandler.schema(Partitioning.partitionType(testTable), formatVersion); + Schema recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2); Types.StructType partitionType = recordSchema.findField(PARTITION_FIELD_ID).type().asStructType(); @@ -414,18 +412,18 @@ public void testPartitionStats() throws Exception { snapshot1.snapshotId(), 0)); - DeleteFile posDeletes = null; - if (formatVersion >= 3) { - posDeletes = commitDVs(testTable, dataFile3); - } else if (formatVersion == 2) { - posDeletes = commitPositionDeletes(testTable); - } - - Snapshot snapshot2 = testTable.currentSnapshot(); + DeleteFile posDeletes = commitPositionDeletes(testTable); + // snapshot2 is unused in the result as same partition was updated by snapshot4 DeleteFile eqDeletes = commitEqualityDeletes(testTable); Snapshot snapshot3 = testTable.currentSnapshot(); + testTable.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + DeleteFile dvs = commitDVs(testTable, dataFile3); + Snapshot snapshot4 = testTable.currentSnapshot(); + + recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 3); + computeAndValidatePartitionStats( testTable, recordSchema, @@ -463,14 +461,14 @@ public void testPartitionStats() throws Exception { 3 * dataFile3.recordCount(), 3, 3 * dataFile3.fileSizeInBytes(), - posDeletes.recordCount(), - ContentFileUtil.isDV(posDeletes) ? 0 : 1, // pos delete file count + posDeletes.recordCount() + dvs.recordCount(), + 1, 0L, 0, null, - snapshot2.timestampMillis(), - snapshot2.snapshotId(), - ContentFileUtil.isDV(posDeletes) ? 1 : 0), // dv count + snapshot4.timestampMillis(), + snapshot4.snapshotId(), + 1), // dv count Tuple.tuple( partitionRecord(partitionType, "bar", "B"), 0, @@ -674,6 +672,47 @@ public void testFullComputeFallbackWithInvalidStats() throws Exception { assertThat(partitionStats.get(0).dataFileCount()).isEqualTo(2); } + @Test + public void testV2toV3SchemaEvolution() throws Exception { + Table testTable = + TestTables.create( + tempDir("schema_evolution"), "schema_evolution", SCHEMA, SPEC, 2, fileFormatProperty); + + // write stats file using v2 schema + DataFile dataFile = + FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "A")); + testTable.newAppend().appendFile(dataFile).commit(); + PartitionStatisticsFile statisticsFile = + PartitionStatsHandler.computeAndWriteStatsFile( + testTable, testTable.currentSnapshot().snapshotId()); + + Types.StructType partitionSchema = Partitioning.partitionType(testTable); + + // read with v2 schema + Schema v2Schema = PartitionStatsHandler.schema(partitionSchema, 2); + List partitionStatsV2; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + v2Schema, testTable.io().newInputFile(statisticsFile.path()))) { + partitionStatsV2 = Lists.newArrayList(recordIterator); + } + + // read with v3 schema + Schema v3Schema = PartitionStatsHandler.schema(partitionSchema, 3); + List partitionStatsV3; + try (CloseableIterable recordIterator = + PartitionStatsHandler.readPartitionStatsFile( + v3Schema, testTable.io().newInputFile(statisticsFile.path()))) { + partitionStatsV3 = Lists.newArrayList(recordIterator); + } + + assertThat(partitionStatsV2).hasSize(partitionStatsV3.size()); + Comparator comparator = Comparators.forType(partitionSchema); + for (int i = 0; i < partitionStatsV2.size(); i++) { + assertThat(isEqual(comparator, partitionStatsV2.get(i), partitionStatsV3.get(i))).isTrue(); + } + } + private static StructLike partitionRecord( Types.StructType partitionType, String val1, String val2) { GenericRecord record = GenericRecord.create(partitionType); @@ -731,9 +770,9 @@ private DeleteFile commitPositionDeletes(Table testTable) { } private DeleteFile commitDVs(Table testTable, DataFile dataFile) { - DeleteFile posDeleteVector = FileGenerationUtil.generateDV(testTable, dataFile); - testTable.newRowDelta().addDeletes(posDeleteVector).commit(); - return posDeleteVector; + DeleteFile dv = FileGenerationUtil.generateDV(testTable, dataFile); + testTable.newRowDelta().addDeletes(dv).commit(); + return dv; } private File tempDir(String folderName) throws IOException { diff --git a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java index c94dd61bba2d..d4859a43a7fe 100644 --- a/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java +++ b/orc/src/test/java/org/apache/iceberg/TestOrcPartitionStatsHandler.java @@ -81,4 +81,11 @@ public void testFullComputeFallbackWithInvalidStats() { .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot write using unregistered internal data format: ORC"); } + + @Override + public void testV2toV3SchemaEvolution() { + assertThatThrownBy(super::testV2toV3SchemaEvolution) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot write using unregistered internal data format: ORC"); + } } From 36b85a50aa1a8f67ef4b6835bc66659202590bf5 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 7 Jul 2025 15:03:34 +0530 Subject: [PATCH 6/6] Address nits --- .../PartitionStatsHandlerTestBase.java | 37 ++++++------------- 1 file changed, 11 insertions(+), 26 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index db1ac087e52e..cf39e4611bf4 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -412,14 +412,19 @@ public void testPartitionStats() throws Exception { snapshot1.snapshotId(), 0)); - DeleteFile posDeletes = commitPositionDeletes(testTable); + DeleteFile posDelete = + FileGenerationUtil.generatePositionDeleteFile(testTable, TestHelpers.Row.of("bar", "A")); + testTable.newRowDelta().addDeletes(posDelete).commit(); // snapshot2 is unused in the result as same partition was updated by snapshot4 - DeleteFile eqDeletes = commitEqualityDeletes(testTable); + DeleteFile eqDelete = + FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); + testTable.newRowDelta().addDeletes(eqDelete).commit(); Snapshot snapshot3 = testTable.currentSnapshot(); testTable.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); - DeleteFile dvs = commitDVs(testTable, dataFile3); + DeleteFile dv = FileGenerationUtil.generateDV(testTable, dataFile3); + testTable.newRowDelta().addDeletes(dv).commit(); Snapshot snapshot4 = testTable.currentSnapshot(); recordSchema = PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 3); @@ -435,7 +440,7 @@ public void testPartitionStats() throws Exception { 3 * dataFile1.fileSizeInBytes(), 0L, 0, - eqDeletes.recordCount(), + eqDelete.recordCount(), 1, null, snapshot3.timestampMillis(), @@ -461,7 +466,7 @@ public void testPartitionStats() throws Exception { 3 * dataFile3.recordCount(), 3, 3 * dataFile3.fileSizeInBytes(), - posDeletes.recordCount() + dvs.recordCount(), + posDelete.recordCount() + dv.recordCount(), 1, 0L, 0, @@ -706,7 +711,7 @@ public void testV2toV3SchemaEvolution() throws Exception { partitionStatsV3 = Lists.newArrayList(recordIterator); } - assertThat(partitionStatsV2).hasSize(partitionStatsV3.size()); + assertThat(partitionStatsV2).hasSameSizeAs(partitionStatsV3); Comparator comparator = Comparators.forType(partitionSchema); for (int i = 0; i < partitionStatsV2.size(); i++) { assertThat(isEqual(comparator, partitionStatsV2.get(i), partitionStatsV3.get(i))).isTrue(); @@ -755,26 +760,6 @@ private static void computeAndValidatePartitionStats( .containsExactlyInAnyOrder(expectedValues); } - private DeleteFile commitEqualityDeletes(Table testTable) { - DeleteFile eqDelete = - FileGenerationUtil.generateEqualityDeleteFile(testTable, TestHelpers.Row.of("foo", "A")); - testTable.newRowDelta().addDeletes(eqDelete).commit(); - return eqDelete; - } - - private DeleteFile commitPositionDeletes(Table testTable) { - DeleteFile posDelete = - FileGenerationUtil.generatePositionDeleteFile(testTable, TestHelpers.Row.of("bar", "A")); - testTable.newRowDelta().addDeletes(posDelete).commit(); - return posDelete; - } - - private DeleteFile commitDVs(Table testTable, DataFile dataFile) { - DeleteFile dv = FileGenerationUtil.generateDV(testTable, dataFile); - testTable.newRowDelta().addDeletes(dv).commit(); - return dv; - } - private File tempDir(String folderName) throws IOException { return java.nio.file.Files.createTempDirectory(temp.toPath(), folderName).toFile(); }