Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/PartitionStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

public class PartitionStats implements StructLike {

private static final int STATS_COUNT = 12;
protected static final int STATS_COUNT = 12;

private StructLike partition;
private int specId;
Expand Down
96 changes: 88 additions & 8 deletions core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,16 @@ private PartitionStatsHandler() {}
NestedField.optional(11, "last_updated_at", LongType.get());
public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
NestedField.optional(12, "last_updated_snapshot_id", LongType.get());
public static final NestedField DV_COUNT =
NestedField.required(13, "dv_count", IntegerType.get());
Copy link
Member Author

@ajantha-bhat ajantha-bhat Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema id, field name, data type and required field as per spec
#12098


/**
* Generates the partition stats file schema based on a combined partition type which considers
* all specs in a table.
*
* <p>Use this only for format version 1 and 2. For version 3 and above use {@link
* #schemaV3Plus(StructType)}
*
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
* Partitioning#partitionType(Table)}.
* @return a schema that corresponds to the provided unified partition type.
Expand All @@ -115,6 +120,53 @@ public static Schema schema(StructType unifiedPartitionType) {
LAST_UPDATED_SNAPSHOT_ID);
}

/**
* Generates the partition stats file schema based on a combined partition type which considers
* all specs in a table. (For format version 3 and above)
*
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
* Partitioning#partitionType(Table)}.
* @return a schema that corresponds to the provided unified partition type.
*/
public static Schema schemaV3Plus(StructType unifiedPartitionType) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
return new Schema(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
NestedField.required(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the spec PR (these fields are required now)
#12098

POSITION_DELETE_RECORD_COUNT.fieldId(),
POSITION_DELETE_RECORD_COUNT.name(),
LongType.get()),
NestedField.required(
POSITION_DELETE_FILE_COUNT.fieldId(),
POSITION_DELETE_FILE_COUNT.name(),
IntegerType.get()),
NestedField.required(
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
EQUALITY_DELETE_RECORD_COUNT.name(),
LongType.get()),
NestedField.required(
EQUALITY_DELETE_FILE_COUNT.fieldId(),
EQUALITY_DELETE_FILE_COUNT.name(),
IntegerType.get()),
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID,
DV_COUNT);
}

static Schema schemaForVersion(Table table, StructType partitionType) {
int formatVersion = ((HasTableOperations) table).operations().current().formatVersion();
if (formatVersion <= 2) {
return schema(partitionType);
}

return schemaV3Plus(partitionType);
}

/**
* Computes the stats incrementally after the snapshot that has partition stats file till the
* current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
Expand Down Expand Up @@ -184,7 +236,7 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long

List<PartitionStats> sortedStats = sortStatsByPartition(stats, partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
table, snapshot.snapshotId(), schemaForVersion(table, partitionType), sortedStats);
}

@VisibleForTesting
Expand Down Expand Up @@ -226,7 +278,12 @@ public static CloseableIterable<PartitionStats> readPartitionStatsFile(

CloseableIterable<StructLike> records =
InternalData.read(fileFormat, inputFile).project(schema).build();
return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats);

if (schema.findField(DV_COUNT.name()) == null) {
return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats);
} else {
return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStatsV3);
}
}

private static OutputFile newPartitionStatsFile(
Expand All @@ -252,6 +309,21 @@ private static PartitionStats recordToPartitionStats(StructLike record) {
new PartitionStats(
record.get(pos++, StructLike.class), // partition
record.get(pos++, Integer.class)); // spec id

for (; pos < record.size(); pos++) {
stats.set(pos, record.get(pos, Object.class));
}

return stats;
}

private static PartitionStats recordToPartitionStatsV3(StructLike record) {
int pos = 0;
PartitionStats stats =
new PartitionStatsV3(
record.get(pos++, StructLike.class), // partition
record.get(pos++, Integer.class)); // spec id

for (; pos < record.size(); pos++) {
stats.set(pos, record.get(pos, Object.class));
}
Expand All @@ -263,13 +335,13 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental(
Table table,
Snapshot snapshot,
StructType partitionType,
PartitionStatisticsFile previousStatsFile)
throws IOException {
PartitionStatisticsFile previousStatsFile) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
// read previous stats, note that partition field will be read as GenericRecord
try (CloseableIterable<PartitionStats> oldStats =
readPartitionStatsFile(
schema(partitionType), table.io().newInputFile(previousStatsFile.path()))) {
schemaForVersion(table, partitionType),
table.io().newInputFile(previousStatsFile.path()))) {
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
Expand Down Expand Up @@ -347,6 +419,7 @@ private static PartitionMap<PartitionStats> computeStatsDiff(

private static PartitionMap<PartitionStats> computeStats(
Table table, List<ManifestFile> manifests, boolean incremental) {
int version = ((HasTableOperations) table).operations().current().formatVersion();
StructType partitionType = Partitioning.partitionType(table);
Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue();
Tasks.foreach(manifests)
Expand All @@ -356,7 +429,7 @@ private static PartitionMap<PartitionStats> computeStats(
.run(
manifest ->
statsByManifest.add(
collectStatsForManifest(table, manifest, partitionType, incremental)));
collectStatsForManifest(table, version, manifest, partitionType, incremental)));

PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
for (PartitionMap<PartitionStats> stats : statsByManifest) {
Expand All @@ -367,7 +440,11 @@ private static PartitionMap<PartitionStats> computeStats(
}

private static PartitionMap<PartitionStats> collectStatsForManifest(
Table table, ManifestFile manifest, StructType partitionType, boolean incremental) {
Table table,
int version,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we can deduce the version inside this API, I didn't want to compute the version for each thread. Hence, computed in the caller.

ManifestFile manifest,
StructType partitionType,
boolean incremental) {
List<String> projection = BaseScan.scanColumns(manifest.content());
try (ManifestReader<?> reader = ManifestFiles.open(manifest, table.io()).select(projection)) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
Expand All @@ -385,7 +462,10 @@ private static PartitionMap<PartitionStats> collectStatsForManifest(
statsMap.computeIfAbsent(
specId,
((PartitionData) file.partition()).copy(),
() -> new PartitionStats(key, specId));
() ->
version > 2
? new PartitionStatsV3(key, specId)
: new PartitionStats(key, specId));
if (entry.isLive()) {
// Live can have both added and existing entries. Consider only added entries for
// incremental compute as existing entries was already included in previous compute.
Expand Down
86 changes: 86 additions & 0 deletions core/src/main/java/org/apache/iceberg/PartitionStatsV3.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

public class PartitionStatsV3 extends PartitionStats {

private int dvCount;

public PartitionStatsV3(StructLike partition, int specId) {
super(partition, specId);
}

public int dvCount() {
return dvCount;
}

@Override
public void liveEntry(ContentFile<?> file, Snapshot snapshot) {
super.liveEntry(file, snapshot);
if (file.content() == FileContent.POSITION_DELETES && file.format() == FileFormat.PUFFIN) {
this.dvCount += 1;
// revert the changes for position delete file count increment from the parent method
this.set(6, positionDeleteFileCount() - 1);
}
}

@Override
void deletedEntryForIncrementalCompute(ContentFile<?> file, Snapshot snapshot) {
super.deletedEntryForIncrementalCompute(file, snapshot);
if (file.content() == FileContent.POSITION_DELETES && file.format() == FileFormat.PUFFIN) {
this.dvCount -= 1;
// revert the changes for position delete file count decrement from the parent method
this.set(6, positionDeleteFileCount() + 1);
}
}

@Override
@Deprecated // will become package-private
public void appendStats(PartitionStats entry) {
super.appendStats(entry);

if (entry instanceof PartitionStatsV3) {
this.dvCount += ((PartitionStatsV3) entry).dvCount;
}
}

@Override
public int size() {
// includes dv counter
return STATS_COUNT + 1;
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
if (pos == STATS_COUNT) {
return javaClass.cast(dvCount);
} else {
return super.get(pos, javaClass);
}
}

@Override
public <T> void set(int pos, T value) {
if (pos == STATS_COUNT) {
this.dvCount = value == null ? 0 : (int) value;
} else {
super.set(pos, value);
}
}
}
Loading