Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions core/src/main/java/org/apache/iceberg/PartitionStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@

public class PartitionStats implements StructLike {

private static final int STATS_COUNT = 12;
private static final int STATS_COUNT = 13;

private StructLike partition;
private int specId;
private long dataRecordCount;
private int dataFileCount;
private long totalDataFileSizeInBytes;
private long positionDeleteRecordCount;
private long positionDeleteRecordCount; // also includes dv record count as per spec
private int positionDeleteFileCount;
private long equalityDeleteRecordCount;
private int equalityDeleteFileCount;
private Long totalRecordCount; // null by default
private Long lastUpdatedAt; // null by default
private Long lastUpdatedSnapshotId; // null by default
private int dvCount;

public PartitionStats(StructLike partition, int specId) {
this.partition = partition;
Expand Down Expand Up @@ -90,6 +91,10 @@ public Long lastUpdatedSnapshotId() {
return lastUpdatedSnapshotId;
}

public int dvCount() {
return dvCount;
}

/**
* Updates the partition stats from the data/delete file.
*
Expand All @@ -109,7 +114,12 @@ public void liveEntry(ContentFile<?> file, Snapshot snapshot) {
break;
case POSITION_DELETES:
this.positionDeleteRecordCount += file.recordCount();
this.positionDeleteFileCount += 1;
if (file.format() == FileFormat.PUFFIN) {
this.dvCount += 1;
} else {
this.positionDeleteFileCount += 1;
}

break;
case EQUALITY_DELETES:
this.equalityDeleteRecordCount += file.recordCount();
Expand Down Expand Up @@ -156,7 +166,12 @@ void deletedEntryForIncrementalCompute(ContentFile<?> file, Snapshot snapshot) {
break;
case POSITION_DELETES:
this.positionDeleteRecordCount -= file.recordCount();
this.positionDeleteFileCount -= 1;
if (file.format() == FileFormat.PUFFIN) {
this.dvCount -= 1;
} else {
this.positionDeleteFileCount -= 1;
}

break;
case EQUALITY_DELETES:
this.equalityDeleteRecordCount -= file.recordCount();
Expand Down Expand Up @@ -188,6 +203,7 @@ public void appendStats(PartitionStats entry) {
this.positionDeleteFileCount += entry.positionDeleteFileCount;
this.equalityDeleteRecordCount += entry.equalityDeleteRecordCount;
this.equalityDeleteFileCount += entry.equalityDeleteFileCount;
this.dvCount += entry.dvCount;

if (entry.totalRecordCount != null) {
if (totalRecordCount == null) {
Expand Down Expand Up @@ -241,6 +257,8 @@ public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(lastUpdatedAt);
case 11:
return javaClass.cast(lastUpdatedSnapshotId);
case 12:
return javaClass.cast(dvCount);
default:
throw new UnsupportedOperationException("Unknown position: " + pos);
}
Expand Down Expand Up @@ -289,6 +307,9 @@ public <T> void set(int pos, T value) {
case 11:
this.lastUpdatedSnapshotId = (Long) value;
break;
case 12:
this.dvCount = value == null ? 0 : (int) value;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaulting to 0. Just like other counters (pos/eq deletes) were default to 0.

break;
default:
throw new UnsupportedOperationException("Unknown position: " + pos);
}
Expand Down
82 changes: 78 additions & 4 deletions core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
Expand All @@ -42,6 +43,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.NestedField;
Expand Down Expand Up @@ -89,17 +91,57 @@ private PartitionStatsHandler() {}
NestedField.optional(11, "last_updated_at", LongType.get());
public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
NestedField.optional(12, "last_updated_snapshot_id", LongType.get());
// Using default value for v3 field to support v3 reader reading file written by v2
public static final NestedField DV_COUNT =
NestedField.required("dv_count")
.withId(13)
.ofType(Types.IntegerType.get())
.withInitialDefault(Literal.of(0))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using default 0 for v3. Because when we try to read v2 stats with v3 schema, we get the field not found error without the default values configuration as dv is a required field in schema.

callstack:

Missing required field: dv_count
	java.lang.IllegalArgumentException: Missing required field: dv_count
	at org.apache.iceberg.data.parquet.BaseParquetReaders$ReadBuilder.defaultReader(BaseParquetReaders.java:269)
	at org.apache.iceberg.data.parquet.BaseParquetReaders$ReadBuilder.struct(BaseParquetReaders.java:252)
	at org.apache.iceberg.data.parquet.BaseParquetReaders$ReadBuilder.message(BaseParquetReaders.java:219)
	at org.apache.iceberg.data.parquet.BaseParquetReaders$ReadBuilder.message(BaseParquetReaders.java:207)
	at org.apache.iceberg.parquet.TypeWithSchemaVisitor.visit(TypeWithSchemaVisitor.java:48)
	at org.apache.iceberg.data.parquet.BaseParquetReaders.createReader(BaseParquetReaders.java:67)
	at org.apache.iceberg.data.parquet.BaseParquetReaders.createReader(BaseParquetReaders.java:59)
	at org.apache.iceberg.data.parquet.InternalReader.create(InternalReader.java:40)
	at org.apache.iceberg.parquet.Parquet$ReadBuilder.lambda$build$0(Parquet.java:1368)
	at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:121)
	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:74)
	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:94)
	at org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
	at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
	at org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:196)
	at org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayList(Lists.java:139)
	at org.apache.iceberg.PartitionStatsHandlerTestBase.testV2toV3SchemaEvolution(PartitionStatsHandlerTestBase.java:695)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome job for adding a unit test for this

.withWriteDefault(Literal.of(0))
.build();

/**
* Generates the partition stats file schema based on a combined partition type which considers
* all specs in a table.
*
* <p>Use this only for format version 1 and 2. For version 3 and above use {@link
* #schema(StructType, int)}
*
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
* Partitioning#partitionType(Table)}.
* @return a schema that corresponds to the provided unified partition type.
* @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link #schema(StructType, int)}
* instead.
*/
@Deprecated
public static Schema schema(StructType unifiedPartitionType) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
return v2Schema(unifiedPartitionType);
}

/**
* Generates the partition stats file schema for a given format version based on a combined
* partition type which considers all specs in a table.
*
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
* Partitioning#partitionType(Table)}.
* @return a schema that corresponds to the provided unified partition type.
*/
public static Schema schema(StructType unifiedPartitionType, int formatVersion) {
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
Preconditions.checkState(
formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
"Invalid format version: %d",
formatVersion);

if (formatVersion <= 2) {
return v2Schema(unifiedPartitionType);
}

return v3Schema(unifiedPartitionType);
}

private static Schema v2Schema(StructType unifiedPartitionType) {
return new Schema(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
SPEC_ID,
Expand All @@ -115,6 +157,35 @@ public static Schema schema(StructType unifiedPartitionType) {
LAST_UPDATED_SNAPSHOT_ID);
}

private static Schema v3Schema(StructType unifiedPartitionType) {
return new Schema(
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
SPEC_ID,
DATA_RECORD_COUNT,
DATA_FILE_COUNT,
TOTAL_DATA_FILE_SIZE_IN_BYTES,
NestedField.required(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the spec PR (these fields are required now)
#12098

POSITION_DELETE_RECORD_COUNT.fieldId(),
POSITION_DELETE_RECORD_COUNT.name(),
LongType.get()),
NestedField.required(
POSITION_DELETE_FILE_COUNT.fieldId(),
POSITION_DELETE_FILE_COUNT.name(),
IntegerType.get()),
NestedField.required(
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
EQUALITY_DELETE_RECORD_COUNT.name(),
LongType.get()),
NestedField.required(
EQUALITY_DELETE_FILE_COUNT.fieldId(),
EQUALITY_DELETE_FILE_COUNT.name(),
IntegerType.get()),
TOTAL_RECORD_COUNT,
LAST_UPDATED_AT,
LAST_UPDATED_SNAPSHOT_ID,
DV_COUNT);
}

/**
* Computes the stats incrementally after the snapshot that has partition stats file till the
* current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
Expand Down Expand Up @@ -184,7 +255,10 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long

List<PartitionStats> sortedStats = sortStatsByPartition(stats, partitionType);
return writePartitionStatsFile(
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
table,
snapshot.snapshotId(),
schema(partitionType, TableUtil.formatVersion(table)),
sortedStats);
}

@VisibleForTesting
Expand Down Expand Up @@ -263,13 +337,13 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental(
Table table,
Snapshot snapshot,
StructType partitionType,
PartitionStatisticsFile previousStatsFile)
throws IOException {
PartitionStatisticsFile previousStatsFile) {
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
// read previous stats, note that partition field will be read as GenericRecord
try (CloseableIterable<PartitionStats> oldStats =
readPartitionStatsFile(
schema(partitionType), table.io().newInputFile(previousStatsFile.path()))) {
schema(partitionType, TableUtil.formatVersion(table)),
table.io().newInputFile(previousStatsFile.path()))) {
oldStats.forEach(
partitionStats ->
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));
Expand Down
Loading