-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Support DV for partition stats #13425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
f05a6ed
465cb22
271a970
9978fd5
f9ca9a6
36b85a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,20 +22,21 @@ | |
|
|
||
| public class PartitionStats implements StructLike { | ||
|
|
||
| private static final int STATS_COUNT = 12; | ||
| static final int STATS_COUNT = 13; | ||
|
|
||
| private StructLike partition; | ||
| private int specId; | ||
| private long dataRecordCount; | ||
| private int dataFileCount; | ||
| private long totalDataFileSizeInBytes; | ||
| private long positionDeleteRecordCount; | ||
| private long positionDeleteRecordCount; // also includes dv record count as per spec | ||
| private int positionDeleteFileCount; | ||
| private long equalityDeleteRecordCount; | ||
| private int equalityDeleteFileCount; | ||
| private Long totalRecordCount; // null by default | ||
| private Long lastUpdatedAt; // null by default | ||
| private Long lastUpdatedSnapshotId; // null by default | ||
| private int dvCount; | ||
|
|
||
| public PartitionStats(StructLike partition, int specId) { | ||
| this.partition = partition; | ||
|
|
@@ -90,6 +91,10 @@ public Long lastUpdatedSnapshotId() { | |
| return lastUpdatedSnapshotId; | ||
| } | ||
|
|
||
| public int dvCount() { | ||
| return dvCount; | ||
| } | ||
|
|
||
| /** | ||
| * Updates the partition stats from the data/delete file. | ||
| * | ||
|
|
@@ -109,7 +114,12 @@ public void liveEntry(ContentFile<?> file, Snapshot snapshot) { | |
| break; | ||
| case POSITION_DELETES: | ||
| this.positionDeleteRecordCount += file.recordCount(); | ||
| this.positionDeleteFileCount += 1; | ||
| if (file.format() == FileFormat.PUFFIN) { | ||
| this.dvCount += 1; | ||
| } else { | ||
| this.positionDeleteFileCount += 1; | ||
nastra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| break; | ||
| case EQUALITY_DELETES: | ||
| this.equalityDeleteRecordCount += file.recordCount(); | ||
|
|
@@ -156,7 +166,12 @@ void deletedEntryForIncrementalCompute(ContentFile<?> file, Snapshot snapshot) { | |
| break; | ||
| case POSITION_DELETES: | ||
| this.positionDeleteRecordCount -= file.recordCount(); | ||
| this.positionDeleteFileCount -= 1; | ||
| if (file.format() == FileFormat.PUFFIN) { | ||
| this.dvCount -= 1; | ||
| } else { | ||
| this.positionDeleteFileCount -= 1; | ||
nastra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| break; | ||
| case EQUALITY_DELETES: | ||
| this.equalityDeleteRecordCount -= file.recordCount(); | ||
|
|
@@ -200,6 +215,8 @@ public void appendStats(PartitionStats entry) { | |
| if (entry.lastUpdatedAt != null) { | ||
| updateSnapshotInfo(entry.lastUpdatedSnapshotId, entry.lastUpdatedAt); | ||
| } | ||
|
|
||
| this.dvCount += entry.dvCount; | ||
ajantha-bhat marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| private void updateSnapshotInfo(long snapshotId, long updatedAt) { | ||
|
|
@@ -241,6 +258,8 @@ public <T> T get(int pos, Class<T> javaClass) { | |
| return javaClass.cast(lastUpdatedAt); | ||
| case 11: | ||
| return javaClass.cast(lastUpdatedSnapshotId); | ||
| case 12: | ||
| return javaClass.cast(dvCount); | ||
| default: | ||
| throw new UnsupportedOperationException("Unknown position: " + pos); | ||
| } | ||
|
|
@@ -289,6 +308,9 @@ public <T> void set(int pos, T value) { | |
| case 11: | ||
| this.lastUpdatedSnapshotId = (Long) value; | ||
| break; | ||
| case 12: | ||
| this.dvCount = value == null ? 0 : (int) value; | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. defaulting to 0. Just like other counters (pos/eq deletes) were default to 0. |
||
| break; | ||
| default: | ||
| throw new UnsupportedOperationException("Unknown position: " + pos); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,17 +89,51 @@ private PartitionStatsHandler() {} | |
| NestedField.optional(11, "last_updated_at", LongType.get()); | ||
| public static final NestedField LAST_UPDATED_SNAPSHOT_ID = | ||
| NestedField.optional(12, "last_updated_snapshot_id", LongType.get()); | ||
| public static final NestedField DV_COUNT = | ||
| NestedField.required(13, "dv_count", IntegerType.get()); | ||
|
||
|
|
||
| /** | ||
| * Generates the partition stats file schema based on a combined partition type which considers | ||
| * all specs in a table. | ||
| * | ||
| * <p>Use this only for format version 1 and 2. For version 3 and above use {@link | ||
| * #schema(StructType, int)} | ||
| * | ||
| * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link | ||
| * Partitioning#partitionType(Table)}. | ||
| * @return a schema that corresponds to the provided unified partition type. | ||
| * @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link #schema(StructType, int)} | ||
| * instead. | ||
| */ | ||
| @Deprecated | ||
| public static Schema schema(StructType unifiedPartitionType) { | ||
| Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); | ||
| return v2Schema(unifiedPartitionType); | ||
| } | ||
|
|
||
| /** | ||
| * Generates the partition stats file schema for a given format version based on a combined | ||
| * partition type which considers all specs in a table. | ||
| * | ||
| * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link | ||
| * Partitioning#partitionType(Table)}. | ||
| * @return a schema that corresponds to the provided unified partition type. | ||
| */ | ||
| public static Schema schema(StructType unifiedPartitionType, int formatVersion) { | ||
| Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); | ||
| Preconditions.checkState( | ||
| formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION, | ||
| "Invalid format version: %d", | ||
| formatVersion); | ||
|
|
||
| if (formatVersion <= 2) { | ||
| return v2Schema(unifiedPartitionType); | ||
| } | ||
|
|
||
| return v3Schema(unifiedPartitionType); | ||
| } | ||
|
|
||
| private static Schema v2Schema(StructType unifiedPartitionType) { | ||
| return new Schema( | ||
| NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), | ||
| SPEC_ID, | ||
|
|
@@ -115,6 +149,35 @@ public static Schema schema(StructType unifiedPartitionType) { | |
| LAST_UPDATED_SNAPSHOT_ID); | ||
| } | ||
|
|
||
| private static Schema v3Schema(StructType unifiedPartitionType) { | ||
| return new Schema( | ||
stevenzwu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), | ||
| SPEC_ID, | ||
| DATA_RECORD_COUNT, | ||
| DATA_FILE_COUNT, | ||
| TOTAL_DATA_FILE_SIZE_IN_BYTES, | ||
| NestedField.required( | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per the spec PR (these fields are required now) |
||
| POSITION_DELETE_RECORD_COUNT.fieldId(), | ||
| POSITION_DELETE_RECORD_COUNT.name(), | ||
| LongType.get()), | ||
| NestedField.required( | ||
| POSITION_DELETE_FILE_COUNT.fieldId(), | ||
| POSITION_DELETE_FILE_COUNT.name(), | ||
| IntegerType.get()), | ||
| NestedField.required( | ||
| EQUALITY_DELETE_RECORD_COUNT.fieldId(), | ||
| EQUALITY_DELETE_RECORD_COUNT.name(), | ||
| LongType.get()), | ||
| NestedField.required( | ||
| EQUALITY_DELETE_FILE_COUNT.fieldId(), | ||
| EQUALITY_DELETE_FILE_COUNT.name(), | ||
| IntegerType.get()), | ||
| TOTAL_RECORD_COUNT, | ||
| LAST_UPDATED_AT, | ||
| LAST_UPDATED_SNAPSHOT_ID, | ||
| DV_COUNT); | ||
| } | ||
|
|
||
| /** | ||
| * Computes the stats incrementally after the snapshot that has partition stats file till the | ||
| * current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after | ||
|
|
@@ -184,7 +247,10 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long | |
|
|
||
| List<PartitionStats> sortedStats = sortStatsByPartition(stats, partitionType); | ||
| return writePartitionStatsFile( | ||
| table, snapshot.snapshotId(), schema(partitionType), sortedStats); | ||
| table, | ||
| snapshot.snapshotId(), | ||
| schema(partitionType, TableUtil.formatVersion(table)), | ||
| sortedStats); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -263,13 +329,13 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental( | |
| Table table, | ||
| Snapshot snapshot, | ||
| StructType partitionType, | ||
| PartitionStatisticsFile previousStatsFile) | ||
| throws IOException { | ||
| PartitionStatisticsFile previousStatsFile) { | ||
| PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); | ||
| // read previous stats, note that partition field will be read as GenericRecord | ||
| try (CloseableIterable<PartitionStats> oldStats = | ||
| readPartitionStatsFile( | ||
| schema(partitionType), table.io().newInputFile(previousStatsFile.path()))) { | ||
| schema(partitionType, TableUtil.formatVersion(table)), | ||
| table.io().newInputFile(previousStatsFile.path()))) { | ||
| oldStats.forEach( | ||
| partitionStats -> | ||
| statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.