-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Support DV for partition stats #13425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f05a6ed
465cb22
271a970
9978fd5
f9ca9a6
36b85a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,11 +89,16 @@ private PartitionStatsHandler() {} | |
| NestedField.optional(11, "last_updated_at", LongType.get()); | ||
| public static final NestedField LAST_UPDATED_SNAPSHOT_ID = | ||
| NestedField.optional(12, "last_updated_snapshot_id", LongType.get()); | ||
| public static final NestedField DV_COUNT = | ||
| NestedField.required(13, "dv_count", IntegerType.get()); | ||
|
||
|
|
||
| /** | ||
| * Generates the partition stats file schema based on a combined partition type which considers | ||
| * all specs in a table. | ||
| * | ||
| * <p>Use this only for format version 1 and 2. For version 3 and above use {@link | ||
| * #schemaV3Plus(StructType)} | ||
| * | ||
| * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link | ||
| * Partitioning#partitionType(Table)}. | ||
| * @return a schema that corresponds to the provided unified partition type. | ||
|
|
@@ -115,6 +120,53 @@ public static Schema schema(StructType unifiedPartitionType) { | |
| LAST_UPDATED_SNAPSHOT_ID); | ||
| } | ||
|
|
||
| /** | ||
| * Generates the partition stats file schema based on a combined partition type which considers | ||
| * all specs in a table. (For format version 3 and above) | ||
| * | ||
| * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link | ||
| * Partitioning#partitionType(Table)}. | ||
| * @return a schema that corresponds to the provided unified partition type. | ||
| */ | ||
| public static Schema schemaV3Plus(StructType unifiedPartitionType) { | ||
ajantha-bhat marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); | ||
| return new Schema( | ||
stevenzwu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), | ||
| SPEC_ID, | ||
| DATA_RECORD_COUNT, | ||
| DATA_FILE_COUNT, | ||
| TOTAL_DATA_FILE_SIZE_IN_BYTES, | ||
| NestedField.required( | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per the spec PR (these fields are required now) |
||
| POSITION_DELETE_RECORD_COUNT.fieldId(), | ||
| POSITION_DELETE_RECORD_COUNT.name(), | ||
| LongType.get()), | ||
| NestedField.required( | ||
| POSITION_DELETE_FILE_COUNT.fieldId(), | ||
| POSITION_DELETE_FILE_COUNT.name(), | ||
| IntegerType.get()), | ||
| NestedField.required( | ||
| EQUALITY_DELETE_RECORD_COUNT.fieldId(), | ||
| EQUALITY_DELETE_RECORD_COUNT.name(), | ||
| LongType.get()), | ||
| NestedField.required( | ||
| EQUALITY_DELETE_FILE_COUNT.fieldId(), | ||
| EQUALITY_DELETE_FILE_COUNT.name(), | ||
| IntegerType.get()), | ||
| TOTAL_RECORD_COUNT, | ||
| LAST_UPDATED_AT, | ||
| LAST_UPDATED_SNAPSHOT_ID, | ||
| DV_COUNT); | ||
| } | ||
|
|
||
| static Schema schemaForVersion(Table table, StructType partitionType) { | ||
| int formatVersion = ((HasTableOperations) table).operations().current().formatVersion(); | ||
ajantha-bhat marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (formatVersion <= 2) { | ||
| return schema(partitionType); | ||
| } | ||
|
|
||
| return schemaV3Plus(partitionType); | ||
| } | ||
|
|
||
| /** | ||
| * Computes the stats incrementally after the snapshot that has partition stats file till the | ||
| * current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after | ||
|
|
@@ -184,7 +236,7 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long | |
|
|
||
| List<PartitionStats> sortedStats = sortStatsByPartition(stats, partitionType); | ||
| return writePartitionStatsFile( | ||
| table, snapshot.snapshotId(), schema(partitionType), sortedStats); | ||
| table, snapshot.snapshotId(), schemaForVersion(table, partitionType), sortedStats); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -226,7 +278,12 @@ public static CloseableIterable<PartitionStats> readPartitionStatsFile( | |
|
|
||
| CloseableIterable<StructLike> records = | ||
| InternalData.read(fileFormat, inputFile).project(schema).build(); | ||
| return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); | ||
|
|
||
| if (schema.findField(DV_COUNT.name()) == null) { | ||
| return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); | ||
| } else { | ||
| return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStatsV3); | ||
ajantha-bhat marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| private static OutputFile newPartitionStatsFile( | ||
|
|
@@ -252,6 +309,21 @@ private static PartitionStats recordToPartitionStats(StructLike record) { | |
| new PartitionStats( | ||
| record.get(pos++, StructLike.class), // partition | ||
| record.get(pos++, Integer.class)); // spec id | ||
|
|
||
| for (; pos < record.size(); pos++) { | ||
| stats.set(pos, record.get(pos, Object.class)); | ||
| } | ||
|
|
||
| return stats; | ||
| } | ||
|
|
||
| private static PartitionStats recordToPartitionStatsV3(StructLike record) { | ||
| int pos = 0; | ||
| PartitionStats stats = | ||
| new PartitionStatsV3( | ||
| record.get(pos++, StructLike.class), // partition | ||
| record.get(pos++, Integer.class)); // spec id | ||
|
|
||
| for (; pos < record.size(); pos++) { | ||
| stats.set(pos, record.get(pos, Object.class)); | ||
| } | ||
|
|
@@ -263,13 +335,13 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental( | |
| Table table, | ||
| Snapshot snapshot, | ||
| StructType partitionType, | ||
| PartitionStatisticsFile previousStatsFile) | ||
| throws IOException { | ||
| PartitionStatisticsFile previousStatsFile) { | ||
| PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); | ||
| // read previous stats, note that partition field will be read as GenericRecord | ||
| try (CloseableIterable<PartitionStats> oldStats = | ||
| readPartitionStatsFile( | ||
| schema(partitionType), table.io().newInputFile(previousStatsFile.path()))) { | ||
| schemaForVersion(table, partitionType), | ||
| table.io().newInputFile(previousStatsFile.path()))) { | ||
| oldStats.forEach( | ||
| partitionStats -> | ||
| statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); | ||
|
|
@@ -347,6 +419,7 @@ private static PartitionMap<PartitionStats> computeStatsDiff( | |
|
|
||
| private static PartitionMap<PartitionStats> computeStats( | ||
| Table table, List<ManifestFile> manifests, boolean incremental) { | ||
| int version = ((HasTableOperations) table).operations().current().formatVersion(); | ||
| StructType partitionType = Partitioning.partitionType(table); | ||
| Queue<PartitionMap<PartitionStats>> statsByManifest = Queues.newConcurrentLinkedQueue(); | ||
| Tasks.foreach(manifests) | ||
|
|
@@ -356,7 +429,7 @@ private static PartitionMap<PartitionStats> computeStats( | |
| .run( | ||
| manifest -> | ||
| statsByManifest.add( | ||
| collectStatsForManifest(table, manifest, partitionType, incremental))); | ||
| collectStatsForManifest(table, version, manifest, partitionType, incremental))); | ||
|
|
||
| PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); | ||
| for (PartitionMap<PartitionStats> stats : statsByManifest) { | ||
|
|
@@ -367,7 +440,11 @@ private static PartitionMap<PartitionStats> computeStats( | |
| } | ||
|
|
||
| private static PartitionMap<PartitionStats> collectStatsForManifest( | ||
| Table table, ManifestFile manifest, StructType partitionType, boolean incremental) { | ||
| Table table, | ||
| int version, | ||
|
||
| ManifestFile manifest, | ||
| StructType partitionType, | ||
| boolean incremental) { | ||
| List<String> projection = BaseScan.scanColumns(manifest.content()); | ||
| try (ManifestReader<?> reader = ManifestFiles.open(manifest, table.io()).select(projection)) { | ||
| PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); | ||
|
|
@@ -385,7 +462,10 @@ private static PartitionMap<PartitionStats> collectStatsForManifest( | |
| statsMap.computeIfAbsent( | ||
| specId, | ||
| ((PartitionData) file.partition()).copy(), | ||
| () -> new PartitionStats(key, specId)); | ||
| () -> | ||
| version > 2 | ||
| ? new PartitionStatsV3(key, specId) | ||
| : new PartitionStats(key, specId)); | ||
| if (entry.isLive()) { | ||
| // Live can have both added and existing entries. Consider only added entries for | ||
| // incremental compute as existing entries was already included in previous compute. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg; | ||
|
|
||
| public class PartitionStatsV3 extends PartitionStats { | ||
|
|
||
| private int dvCount; | ||
|
|
||
| public PartitionStatsV3(StructLike partition, int specId) { | ||
| super(partition, specId); | ||
| } | ||
|
|
||
| public int dvCount() { | ||
| return dvCount; | ||
| } | ||
|
|
||
| @Override | ||
| public void liveEntry(ContentFile<?> file, Snapshot snapshot) { | ||
| super.liveEntry(file, snapshot); | ||
| if (file.content() == FileContent.POSITION_DELETES && file.format() == FileFormat.PUFFIN) { | ||
| this.dvCount += 1; | ||
| // revert the changes for position delete file count increment from the parent method | ||
| this.set(6, positionDeleteFileCount() - 1); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| void deletedEntryForIncrementalCompute(ContentFile<?> file, Snapshot snapshot) { | ||
| super.deletedEntryForIncrementalCompute(file, snapshot); | ||
| if (file.content() == FileContent.POSITION_DELETES && file.format() == FileFormat.PUFFIN) { | ||
| this.dvCount -= 1; | ||
| // revert the changes for position delete file count decrement from the parent method | ||
| this.set(6, positionDeleteFileCount() + 1); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| @Deprecated // will become package-private | ||
| public void appendStats(PartitionStats entry) { | ||
| super.appendStats(entry); | ||
|
|
||
| if (entry instanceof PartitionStatsV3) { | ||
| this.dvCount += ((PartitionStatsV3) entry).dvCount; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public int size() { | ||
| // includes dv counter | ||
| return STATS_COUNT + 1; | ||
| } | ||
|
|
||
| @Override | ||
| public <T> T get(int pos, Class<T> javaClass) { | ||
| if (pos == STATS_COUNT) { | ||
| return javaClass.cast(dvCount); | ||
| } else { | ||
| return super.get(pos, javaClass); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public <T> void set(int pos, T value) { | ||
| if (pos == STATS_COUNT) { | ||
| this.dvCount = value == null ? 0 : (int) value; | ||
| } else { | ||
| super.set(pos, value); | ||
| } | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.