Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions core/src/main/java/org/apache/iceberg/PartitionStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@

public class PartitionStats implements StructLike {

private static final int STATS_COUNT = 12;
static final int STATS_COUNT = 13;

private StructLike partition;
private int specId;
private long dataRecordCount;
private int dataFileCount;
private long totalDataFileSizeInBytes;
private long positionDeleteRecordCount;
private long positionDeleteRecordCount; // also includes dv record count as per spec
private int positionDeleteFileCount;
private long equalityDeleteRecordCount;
private int equalityDeleteFileCount;
private Long totalRecordCount; // null by default
private Long lastUpdatedAt; // null by default
private Long lastUpdatedSnapshotId; // null by default
private int dvCount;

public PartitionStats(StructLike partition, int specId) {
this.partition = partition;
Expand Down Expand Up @@ -90,6 +91,10 @@ public Long lastUpdatedSnapshotId() {
return lastUpdatedSnapshotId;
}

public int dvCount() {
return dvCount;
}

/**
* Updates the partition stats from the data/delete file.
*
Expand All @@ -109,7 +114,12 @@ public void liveEntry(ContentFile<?> file, Snapshot snapshot) {
break;
case POSITION_DELETES:
this.positionDeleteRecordCount += file.recordCount();
this.positionDeleteFileCount += 1;
if (file.format() == FileFormat.PUFFIN) {
this.dvCount += 1;
} else {
this.positionDeleteFileCount += 1;
}

break;
case EQUALITY_DELETES:
this.equalityDeleteRecordCount += file.recordCount();
Expand Down Expand Up @@ -156,7 +166,12 @@ void deletedEntryForIncrementalCompute(ContentFile<?> file, Snapshot snapshot) {
break;
case POSITION_DELETES:
this.positionDeleteRecordCount -= file.recordCount();
this.positionDeleteFileCount -= 1;
if (file.format() == FileFormat.PUFFIN) {
this.dvCount -= 1;
} else {
this.positionDeleteFileCount -= 1;
}

break;
case EQUALITY_DELETES:
this.equalityDeleteRecordCount -= file.recordCount();
Expand Down Expand Up @@ -200,6 +215,8 @@ public void appendStats(PartitionStats entry) {
if (entry.lastUpdatedAt != null) {
updateSnapshotInfo(entry.lastUpdatedSnapshotId, entry.lastUpdatedAt);
}

this.dvCount += entry.dvCount;
}

private void updateSnapshotInfo(long snapshotId, long updatedAt) {
Expand Down Expand Up @@ -241,6 +258,8 @@ public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(lastUpdatedAt);
case 11:
return javaClass.cast(lastUpdatedSnapshotId);
case 12:
return javaClass.cast(dvCount);
default:
throw new UnsupportedOperationException("Unknown position: " + pos);
}
Expand Down Expand Up @@ -289,6 +308,9 @@ public <T> void set(int pos, T value) {
case 11:
this.lastUpdatedSnapshotId = (Long) value;
break;
case 12:
this.dvCount = value == null ? 0 : (int) value;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaulting to 0. Just like other counters (pos/eq deletes) were default to 0.

break;
default:
throw new UnsupportedOperationException("Unknown position: " + pos);
}
Expand Down
74 changes: 70 additions & 4 deletions core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,51 @@ private PartitionStatsHandler() {}
NestedField.optional(11, "last_updated_at", LongType.get());
public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
NestedField.optional(12, "last_updated_snapshot_id", LongType.get());
public static final NestedField DV_COUNT =
NestedField.required(13, "dv_count", IntegerType.get());
Copy link
Member Author

@ajantha-bhat ajantha-bhat Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema id, field name, data type and required field as per spec
#12098


/**
* Generates the partition stats file schema based on a combined partition type which considers
* all specs in a table.
*
* <p>Use this only for format version 1 and 2. For version 3 and above use {@link
* #schema(StructType, int)}
*
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
* Partitioning#partitionType(Table)}.
* @return a schema that corresponds to the provided unified partition type.
* @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link #schema(StructType, int)}
* instead.
*/
@Deprecated
public static Schema schema(StructType unifiedPartitionType) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
return v2Schema(unifiedPartitionType);
}

/**
* Generates the partition stats file schema for a given format version based on a combined
* partition type which considers all specs in a table.
*
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
* Partitioning#partitionType(Table)}.
* @return a schema that corresponds to the provided unified partition type.
*/
public static Schema schema(StructType unifiedPartitionType, int formatVersion) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
Preconditions.checkState(
formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
"Invalid format version: %d",
formatVersion);

if (formatVersion <= 2) {
return v2Schema(unifiedPartitionType);
}

return v3Schema(unifiedPartitionType);
}

private static Schema v2Schema(StructType unifiedPartitionType) {
return new Schema(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
SPEC_ID,
Expand All @@ -115,6 +149,35 @@ public static Schema schema(StructType unifiedPartitionType) {
LAST_UPDATED_SNAPSHOT_ID);
}

private static Schema v3Schema(StructType unifiedPartitionType) {
return new Schema(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
NestedField.required(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the spec PR (these fields are required now)
#12098

POSITION_DELETE_RECORD_COUNT.fieldId(),
POSITION_DELETE_RECORD_COUNT.name(),
LongType.get()),
NestedField.required(
POSITION_DELETE_FILE_COUNT.fieldId(),
POSITION_DELETE_FILE_COUNT.name(),
IntegerType.get()),
NestedField.required(
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
EQUALITY_DELETE_RECORD_COUNT.name(),
LongType.get()),
NestedField.required(
EQUALITY_DELETE_FILE_COUNT.fieldId(),
EQUALITY_DELETE_FILE_COUNT.name(),
IntegerType.get()),
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID,
DV_COUNT);
}

/**
* Computes the stats incrementally after the snapshot that has partition stats file till the
* current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
Expand Down Expand Up @@ -184,7 +247,10 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long

List<PartitionStats> sortedStats = sortStatsByPartition(stats, partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
table,
snapshot.snapshotId(),
schema(partitionType, TableUtil.formatVersion(table)),
sortedStats);
}

@VisibleForTesting
Expand Down Expand Up @@ -263,13 +329,13 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental(
Table table,
Snapshot snapshot,
StructType partitionType,
PartitionStatisticsFile previousStatsFile)
throws IOException {
PartitionStatisticsFile previousStatsFile) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
// read previous stats, note that partition field will be read as GenericRecord
try (CloseableIterable<PartitionStats> oldStats =
readPartitionStatsFile(
schema(partitionType), table.io().newInputFile(previousStatsFile.path()))) {
schema(partitionType, TableUtil.formatVersion(table)),
table.io().newInputFile(previousStatsFile.path()))) {
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
Expand Down
Loading
Loading