Skip to content

Commit 9f91295

Browse files
authored
Core: Support DV for partition stats (#13425)
1 parent 8a747ab commit 9f91295

File tree

4 files changed

+218
-52
lines changed

4 files changed

+218
-52
lines changed

core/src/main/java/org/apache/iceberg/PartitionStats.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,21 @@
2222

2323
public class PartitionStats implements StructLike {
2424

25-
private static final int STATS_COUNT = 12;
25+
private static final int STATS_COUNT = 13;
2626

2727
private StructLike partition;
2828
private int specId;
2929
private long dataRecordCount;
3030
private int dataFileCount;
3131
private long totalDataFileSizeInBytes;
32-
private long positionDeleteRecordCount;
32+
private long positionDeleteRecordCount; // also includes dv record count as per spec
3333
private int positionDeleteFileCount;
3434
private long equalityDeleteRecordCount;
3535
private int equalityDeleteFileCount;
3636
private Long totalRecordCount; // null by default
3737
private Long lastUpdatedAt; // null by default
3838
private Long lastUpdatedSnapshotId; // null by default
39+
private int dvCount;
3940

4041
public PartitionStats(StructLike partition, int specId) {
4142
this.partition = partition;
@@ -90,6 +91,10 @@ public Long lastUpdatedSnapshotId() {
9091
return lastUpdatedSnapshotId;
9192
}
9293

94+
public int dvCount() {
95+
return dvCount;
96+
}
97+
9398
/**
9499
* Updates the partition stats from the data/delete file.
95100
*
@@ -109,7 +114,12 @@ public void liveEntry(ContentFile<?> file, Snapshot snapshot) {
109114
break;
110115
case POSITION_DELETES:
111116
this.positionDeleteRecordCount += file.recordCount();
112-
this.positionDeleteFileCount += 1;
117+
if (file.format() == FileFormat.PUFFIN) {
118+
this.dvCount += 1;
119+
} else {
120+
this.positionDeleteFileCount += 1;
121+
}
122+
113123
break;
114124
case EQUALITY_DELETES:
115125
this.equalityDeleteRecordCount += file.recordCount();
@@ -156,7 +166,12 @@ void deletedEntryForIncrementalCompute(ContentFile<?> file, Snapshot snapshot) {
156166
break;
157167
case POSITION_DELETES:
158168
this.positionDeleteRecordCount -= file.recordCount();
159-
this.positionDeleteFileCount -= 1;
169+
if (file.format() == FileFormat.PUFFIN) {
170+
this.dvCount -= 1;
171+
} else {
172+
this.positionDeleteFileCount -= 1;
173+
}
174+
160175
break;
161176
case EQUALITY_DELETES:
162177
this.equalityDeleteRecordCount -= file.recordCount();
@@ -188,6 +203,7 @@ public void appendStats(PartitionStats entry) {
188203
this.positionDeleteFileCount += entry.positionDeleteFileCount;
189204
this.equalityDeleteRecordCount += entry.equalityDeleteRecordCount;
190205
this.equalityDeleteFileCount += entry.equalityDeleteFileCount;
206+
this.dvCount += entry.dvCount;
191207

192208
if (entry.totalRecordCount != null) {
193209
if (totalRecordCount == null) {
@@ -241,6 +257,8 @@ public <T> T get(int pos, Class<T> javaClass) {
241257
return javaClass.cast(lastUpdatedAt);
242258
case 11:
243259
return javaClass.cast(lastUpdatedSnapshotId);
260+
case 12:
261+
return javaClass.cast(dvCount);
244262
default:
245263
throw new UnsupportedOperationException("Unknown position: " + pos);
246264
}
@@ -289,6 +307,9 @@ public <T> void set(int pos, T value) {
289307
case 11:
290308
this.lastUpdatedSnapshotId = (Long) value;
291309
break;
310+
case 12:
311+
this.dvCount = value == null ? 0 : (int) value;
312+
break;
292313
default:
293314
throw new UnsupportedOperationException("Unknown position: " + pos);
294315
}

core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.stream.Collectors;
3434
import java.util.stream.StreamSupport;
3535
import org.apache.iceberg.data.GenericRecord;
36+
import org.apache.iceberg.expressions.Literal;
3637
import org.apache.iceberg.io.CloseableIterable;
3738
import org.apache.iceberg.io.FileAppender;
3839
import org.apache.iceberg.io.InputFile;
@@ -42,6 +43,7 @@
4243
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4344
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
4445
import org.apache.iceberg.types.Comparators;
46+
import org.apache.iceberg.types.Types;
4547
import org.apache.iceberg.types.Types.IntegerType;
4648
import org.apache.iceberg.types.Types.LongType;
4749
import org.apache.iceberg.types.Types.NestedField;
@@ -89,17 +91,57 @@ private PartitionStatsHandler() {}
8991
NestedField.optional(11, "last_updated_at", LongType.get());
9092
public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
9193
NestedField.optional(12, "last_updated_snapshot_id", LongType.get());
94+
// Using default value for v3 field to support v3 reader reading file written by v2
95+
public static final NestedField DV_COUNT =
96+
NestedField.required("dv_count")
97+
.withId(13)
98+
.ofType(Types.IntegerType.get())
99+
.withInitialDefault(Literal.of(0))
100+
.withWriteDefault(Literal.of(0))
101+
.build();
92102

93103
/**
94104
* Generates the partition stats file schema based on a combined partition type which considers
95105
* all specs in a table.
96106
*
107+
* <p>Use this only for format version 1 and 2. For version 3 and above use {@link
108+
* #schema(StructType, int)}
109+
*
97110
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
98111
* Partitioning#partitionType(Table)}.
99112
* @return a schema that corresponds to the provided unified partition type.
113+
* @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link #schema(StructType, int)}
114+
* instead.
100115
*/
116+
@Deprecated
101117
public static Schema schema(StructType unifiedPartitionType) {
102118
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
119+
return v2Schema(unifiedPartitionType);
120+
}
121+
122+
/**
123+
* Generates the partition stats file schema for a given format version based on a combined
124+
* partition type which considers all specs in a table.
125+
*
126+
* @param unifiedPartitionType unified partition schema type. Could be calculated by {@link
127+
* Partitioning#partitionType(Table)}.
128+
* @return a schema that corresponds to the provided unified partition type.
129+
*/
130+
public static Schema schema(StructType unifiedPartitionType, int formatVersion) {
131+
Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned");
132+
Preconditions.checkState(
133+
formatVersion > 0 && formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
134+
"Invalid format version: %d",
135+
formatVersion);
136+
137+
if (formatVersion <= 2) {
138+
return v2Schema(unifiedPartitionType);
139+
}
140+
141+
return v3Schema(unifiedPartitionType);
142+
}
143+
144+
private static Schema v2Schema(StructType unifiedPartitionType) {
103145
return new Schema(
104146
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
105147
SPEC_ID,
@@ -115,6 +157,35 @@ public static Schema schema(StructType unifiedPartitionType) {
115157
LAST_UPDATED_SNAPSHOT_ID);
116158
}
117159

160+
private static Schema v3Schema(StructType unifiedPartitionType) {
161+
return new Schema(
162+
NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType),
163+
SPEC_ID,
164+
DATA_RECORD_COUNT,
165+
DATA_FILE_COUNT,
166+
TOTAL_DATA_FILE_SIZE_IN_BYTES,
167+
NestedField.required(
168+
POSITION_DELETE_RECORD_COUNT.fieldId(),
169+
POSITION_DELETE_RECORD_COUNT.name(),
170+
LongType.get()),
171+
NestedField.required(
172+
POSITION_DELETE_FILE_COUNT.fieldId(),
173+
POSITION_DELETE_FILE_COUNT.name(),
174+
IntegerType.get()),
175+
NestedField.required(
176+
EQUALITY_DELETE_RECORD_COUNT.fieldId(),
177+
EQUALITY_DELETE_RECORD_COUNT.name(),
178+
LongType.get()),
179+
NestedField.required(
180+
EQUALITY_DELETE_FILE_COUNT.fieldId(),
181+
EQUALITY_DELETE_FILE_COUNT.name(),
182+
IntegerType.get()),
183+
TOTAL_RECORD_COUNT,
184+
LAST_UPDATED_AT,
185+
LAST_UPDATED_SNAPSHOT_ID,
186+
DV_COUNT);
187+
}
188+
118189
/**
119190
* Computes the stats incrementally after the snapshot that has partition stats file till the
120191
* current snapshot and writes the combined result into a {@link PartitionStatisticsFile} after
@@ -190,7 +261,10 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long
190261

191262
List<PartitionStats> sortedStats = sortStatsByPartition(stats, partitionType);
192263
return writePartitionStatsFile(
193-
table, snapshot.snapshotId(), schema(partitionType), sortedStats);
264+
table,
265+
snapshot.snapshotId(),
266+
schema(partitionType, TableUtil.formatVersion(table)),
267+
sortedStats);
194268
}
195269

196270
@VisibleForTesting
@@ -269,13 +343,13 @@ private static Collection<PartitionStats> computeAndMergeStatsIncremental(
269343
Table table,
270344
Snapshot snapshot,
271345
StructType partitionType,
272-
PartitionStatisticsFile previousStatsFile)
273-
throws IOException {
346+
PartitionStatisticsFile previousStatsFile) {
274347
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs());
275348
// read previous stats, note that partition field will be read as GenericRecord
276349
try (CloseableIterable<PartitionStats> oldStats =
277350
readPartitionStatsFile(
278-
schema(partitionType), table.io().newInputFile(previousStatsFile.path()))) {
351+
schema(partitionType, TableUtil.formatVersion(table)),
352+
table.io().newInputFile(previousStatsFile.path()))) {
279353
oldStats.forEach(
280354
partitionStats ->
281355
statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats));

0 commit comments

Comments
 (0)