Skip to content

Commit 8e456ae

Browse files
authored
Parquet: Clean up Parquet generic and internal readers (apache#12102)
1 parent 61241ed commit 8e456ae

File tree

8 files changed

+187
-172
lines changed

8 files changed

+187
-172
lines changed

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ public ParquetValueReader<RowData> struct(
116116
expected != null ? expected.fields() : ImmutableList.of();
117117
List<ParquetValueReader<?>> reorderedFields =
118118
Lists.newArrayListWithExpectedSize(expectedFields.size());
119-
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
120119
// Defaulting to parent max definition level
121120
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
122121
for (Types.NestedField field : expectedFields) {
@@ -128,32 +127,26 @@ public ParquetValueReader<RowData> struct(
128127
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
129128
reorderedFields.add(
130129
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
131-
types.add(null);
132130
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
133131
reorderedFields.add(ParquetValueReaders.position());
134-
types.add(null);
135132
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
136133
reorderedFields.add(ParquetValueReaders.constant(false));
137-
types.add(null);
138134
} else if (reader != null) {
139135
reorderedFields.add(reader);
140-
types.add(typesById.get(id));
141136
} else if (field.initialDefault() != null) {
142137
reorderedFields.add(
143138
ParquetValueReaders.constant(
144139
RowDataUtil.convertConstant(field.type(), field.initialDefault()),
145140
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
146-
types.add(typesById.get(id));
147141
} else if (field.isOptional()) {
148142
reorderedFields.add(ParquetValueReaders.nulls());
149-
types.add(null);
150143
} else {
151144
throw new IllegalArgumentException(
152145
String.format("Missing required field: %s", field.name()));
153146
}
154147
}
155148

156-
return new RowDataReader(types, reorderedFields);
149+
return new RowDataReader(reorderedFields);
157150
}
158151

159152
@Override
@@ -662,8 +655,8 @@ private static class RowDataReader
662655
extends ParquetValueReaders.StructReader<RowData, GenericRowData> {
663656
private final int numFields;
664657

665-
RowDataReader(List<Type> types, List<ParquetValueReader<?>> readers) {
666-
super(types, readers);
658+
RowDataReader(List<ParquetValueReader<?>> readers) {
659+
super(readers);
667660
this.numFields = readers.size();
668661
}
669662

parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java

Lines changed: 66 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,25 @@
2727
import org.apache.iceberg.parquet.ParquetValueReader;
2828
import org.apache.iceberg.parquet.ParquetValueReaders;
2929
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
30+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3031
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
3132
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3233
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3334
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
35+
import org.apache.iceberg.types.Type.TypeID;
3436
import org.apache.iceberg.types.Types;
3537
import org.apache.parquet.column.ColumnDescriptor;
3638
import org.apache.parquet.schema.GroupType;
3739
import org.apache.parquet.schema.LogicalTypeAnnotation;
40+
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
3841
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
42+
import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation;
43+
import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
44+
import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation;
45+
import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
46+
import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
47+
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
48+
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
3949
import org.apache.parquet.schema.MessageType;
4050
import org.apache.parquet.schema.PrimitiveType;
4151
import org.apache.parquet.schema.Type;
@@ -78,8 +88,12 @@ protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
7888
return new GenericParquetReaders.DateReader(desc);
7989
}
8090

81-
protected ParquetValueReader<?> timeReader(
82-
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
91+
protected ParquetValueReader<?> timeReader(ColumnDescriptor desc) {
92+
LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation();
93+
Preconditions.checkArgument(
94+
time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time);
95+
96+
LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit();
8397
switch (unit) {
8498
case MICROS:
8599
return new GenericParquetReaders.TimeReader(desc);
@@ -90,12 +104,17 @@ protected ParquetValueReader<?> timeReader(
90104
}
91105
}
92106

93-
protected ParquetValueReader<?> timestampReader(
94-
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
107+
protected ParquetValueReader<?> timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) {
95108
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
96109
return new GenericParquetReaders.TimestampInt96Reader(desc);
97110
}
98111

112+
LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation();
113+
Preconditions.checkArgument(
114+
timestamp instanceof TimestampLogicalTypeAnnotation,
115+
"Invalid timestamp logical type: " + timestamp);
116+
117+
LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit();
99118
switch (unit) {
100119
case MICROS:
101120
return isAdjustedToUTC
@@ -148,96 +167,79 @@ public ParquetValueReader<?> struct(
148167
}
149168
}
150169

151-
private class LogicalTypeAnnotationParquetValueReaderVisitor
152-
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
170+
private class LogicalTypeReadBuilder
171+
implements LogicalTypeAnnotationVisitor<ParquetValueReader<?>> {
153172

154173
private final ColumnDescriptor desc;
155174
private final org.apache.iceberg.types.Type.PrimitiveType expected;
156-
private final PrimitiveType primitive;
157175

158-
LogicalTypeAnnotationParquetValueReaderVisitor(
159-
ColumnDescriptor desc,
160-
org.apache.iceberg.types.Type.PrimitiveType expected,
161-
PrimitiveType primitive) {
176+
LogicalTypeReadBuilder(
177+
ColumnDescriptor desc, org.apache.iceberg.types.Type.PrimitiveType expected) {
162178
this.desc = desc;
163179
this.expected = expected;
164-
this.primitive = primitive;
165180
}
166181

167182
@Override
168-
public Optional<ParquetValueReader<?>> visit(
169-
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
170-
return Optional.of(new ParquetValueReaders.StringReader(desc));
183+
public Optional<ParquetValueReader<?>> visit(StringLogicalTypeAnnotation stringLogicalType) {
184+
return Optional.of(ParquetValueReaders.strings(desc));
171185
}
172186

173187
@Override
174-
public Optional<ParquetValueReader<?>> visit(
175-
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
176-
return Optional.of(new ParquetValueReaders.StringReader(desc));
188+
public Optional<ParquetValueReader<?>> visit(EnumLogicalTypeAnnotation enumLogicalType) {
189+
return Optional.of(ParquetValueReaders.strings(desc));
177190
}
178191

179192
@Override
180193
public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decimalLogicalType) {
181-
switch (primitive.getPrimitiveTypeName()) {
182-
case BINARY:
183-
case FIXED_LEN_BYTE_ARRAY:
184-
return Optional.of(
185-
new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale()));
186-
case INT64:
187-
return Optional.of(
188-
new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale()));
189-
case INT32:
190-
return Optional.of(
191-
new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale()));
192-
default:
193-
throw new UnsupportedOperationException(
194-
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName());
195-
}
194+
return Optional.of(ParquetValueReaders.bigDecimals(desc));
196195
}
197196

198197
@Override
199-
public Optional<ParquetValueReader<?>> visit(
200-
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
198+
public Optional<ParquetValueReader<?>> visit(DateLogicalTypeAnnotation dateLogicalType) {
201199
return Optional.of(dateReader(desc));
202200
}
203201

204202
@Override
205-
public Optional<ParquetValueReader<?>> visit(
206-
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
207-
return Optional.of(timeReader(desc, timeLogicalType.getUnit()));
203+
public Optional<ParquetValueReader<?>> visit(TimeLogicalTypeAnnotation timeLogicalType) {
204+
return Optional.of(timeReader(desc));
208205
}
209206

210207
@Override
211208
public Optional<ParquetValueReader<?>> visit(
212-
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
209+
TimestampLogicalTypeAnnotation timestampLogicalType) {
213210
return Optional.of(
214-
timestampReader(
215-
desc,
216-
timestampLogicalType.getUnit(),
217-
((Types.TimestampType) expected).shouldAdjustToUTC()));
211+
timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC()));
218212
}
219213

220214
@Override
221-
public Optional<ParquetValueReader<?>> visit(
222-
LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
215+
public Optional<ParquetValueReader<?>> visit(IntLogicalTypeAnnotation intLogicalType) {
223216
if (intLogicalType.getBitWidth() == 64) {
217+
Preconditions.checkArgument(
218+
intLogicalType.isSigned(), "Cannot read UINT64 as a long value");
219+
224220
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
225221
}
226-
return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG)
227-
? Optional.of(new ParquetValueReaders.IntAsLongReader(desc))
228-
: Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
222+
223+
if (expected.typeId() == TypeID.LONG) {
224+
return Optional.of(new ParquetValueReaders.IntAsLongReader(desc));
225+
}
226+
227+
Preconditions.checkArgument(
228+
intLogicalType.isSigned() || intLogicalType.getBitWidth() < 32,
229+
"Cannot read UINT32 as an int value");
230+
231+
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc));
229232
}
230233

231234
@Override
232-
public Optional<ParquetValueReader<?>> visit(
233-
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
234-
return Optional.of(new ParquetValueReaders.StringReader(desc));
235+
public Optional<ParquetValueReader<?>> visit(JsonLogicalTypeAnnotation jsonLogicalType) {
236+
return Optional.of(ParquetValueReaders.strings(desc));
235237
}
236238

237239
@Override
238240
public Optional<ParquetValueReader<?>> visit(
239241
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
240-
return Optional.of(new ParquetValueReaders.BytesReader(desc));
242+
return Optional.of(ParquetValueReaders.byteBuffers(desc));
241243
}
242244

243245
@Override
@@ -388,7 +390,7 @@ public ParquetValueReader<?> primitive(
388390
if (primitive.getLogicalTypeAnnotation() != null) {
389391
return primitive
390392
.getLogicalTypeAnnotation()
391-
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
393+
.accept(new LogicalTypeReadBuilder(desc, expected))
392394
.orElseThrow(
393395
() ->
394396
new UnsupportedOperationException(
@@ -399,31 +401,31 @@ public ParquetValueReader<?> primitive(
399401
case FIXED_LEN_BYTE_ARRAY:
400402
return fixedReader(desc);
401403
case BINARY:
402-
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) {
403-
return new ParquetValueReaders.StringReader(desc);
404+
if (expected.typeId() == TypeID.STRING) {
405+
return ParquetValueReaders.strings(desc);
404406
} else {
405-
return new ParquetValueReaders.BytesReader(desc);
407+
return ParquetValueReaders.byteBuffers(desc);
406408
}
407409
case INT32:
408-
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) {
409-
return new ParquetValueReaders.IntAsLongReader(desc);
410+
if (expected.typeId() == TypeID.LONG) {
411+
return ParquetValueReaders.intsAsLongs(desc);
410412
} else {
411-
return new ParquetValueReaders.UnboxedReader<>(desc);
413+
return ParquetValueReaders.unboxed(desc);
412414
}
413415
case FLOAT:
414-
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.DOUBLE) {
415-
return new ParquetValueReaders.FloatAsDoubleReader(desc);
416+
if (expected.typeId() == TypeID.DOUBLE) {
417+
return ParquetValueReaders.floatsAsDoubles(desc);
416418
} else {
417-
return new ParquetValueReaders.UnboxedReader<>(desc);
419+
return ParquetValueReaders.unboxed(desc);
418420
}
419421
case BOOLEAN:
420422
case INT64:
421423
case DOUBLE:
422-
return new ParquetValueReaders.UnboxedReader<>(desc);
424+
return ParquetValueReaders.unboxed(desc);
423425
case INT96:
424426
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
425427
// compatibility we try to read INT96 as timestamps.
426-
return timestampReader(desc, LogicalTypeAnnotation.TimeUnit.NANOS, true);
428+
return timestampReader(desc, true);
427429
default:
428430
throw new UnsupportedOperationException("Unsupported type: " + primitive);
429431
}

parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public static ParquetValueReader<Record> buildReader(
5959
@Override
6060
protected ParquetValueReader<Record> createStructReader(
6161
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
62-
return ParquetValueReaders.recordReader(types, fieldReaders, structType);
62+
return ParquetValueReaders.recordReader(fieldReaders, structType);
6363
}
6464

6565
@Override

parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
import org.apache.iceberg.parquet.ParquetValueReaders;
2727
import org.apache.iceberg.types.Types.StructType;
2828
import org.apache.parquet.column.ColumnDescriptor;
29-
import org.apache.parquet.schema.LogicalTypeAnnotation;
3029
import org.apache.parquet.schema.MessageType;
31-
import org.apache.parquet.schema.PrimitiveType;
3230
import org.apache.parquet.schema.Type;
3331

3432
public class InternalReader<T extends StructLike> extends BaseParquetReaders<T> {
@@ -53,8 +51,7 @@ public static <T extends StructLike> ParquetValueReader<T> create(
5351
@SuppressWarnings("unchecked")
5452
protected ParquetValueReader<T> createStructReader(
5553
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
56-
return (ParquetValueReader<T>)
57-
ParquetValueReaders.recordReader(types, fieldReaders, structType);
54+
return (ParquetValueReader<T>) ParquetValueReaders.recordReader(fieldReaders, structType);
5855
}
5956

6057
@Override
@@ -68,26 +65,12 @@ protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
6865
}
6966

7067
@Override
71-
protected ParquetValueReader<?> timeReader(
72-
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
73-
if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
74-
return ParquetValueReaders.millisAsTimes(desc);
75-
}
76-
77-
return new ParquetValueReaders.UnboxedReader<>(desc);
68+
protected ParquetValueReader<?> timeReader(ColumnDescriptor desc) {
69+
return ParquetValueReaders.times(desc);
7870
}
7971

8072
@Override
81-
protected ParquetValueReader<?> timestampReader(
82-
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
83-
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
84-
return ParquetValueReaders.int96Timestamps(desc);
85-
}
86-
87-
if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
88-
return ParquetValueReaders.millisAsTimestamps(desc);
89-
}
90-
91-
return new ParquetValueReaders.UnboxedReader<>(desc);
73+
protected ParquetValueReader<?> timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) {
74+
return ParquetValueReaders.timestamps(desc);
9275
}
9376
}

parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,17 @@ public ParquetValueReader<?> struct(
100100
expected != null ? expected.fields() : ImmutableList.of();
101101
List<ParquetValueReader<?>> reorderedFields =
102102
Lists.newArrayListWithExpectedSize(expectedFields.size());
103-
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
104103
for (Types.NestedField field : expectedFields) {
105104
int id = field.fieldId();
106105
ParquetValueReader<?> reader = readersById.get(id);
107106
if (reader != null) {
108107
reorderedFields.add(reader);
109-
types.add(typesById.get(id));
110108
} else {
111109
reorderedFields.add(ParquetValueReaders.nulls());
112-
types.add(null);
113110
}
114111
}
115112

116-
return new RecordReader(types, reorderedFields, avroSchema);
113+
return new RecordReader(reorderedFields, avroSchema);
117114
}
118115

119116
@Override
@@ -346,8 +343,8 @@ public long readLong() {
346343
static class RecordReader extends StructReader<Record, Record> {
347344
private final Schema schema;
348345

349-
RecordReader(List<Type> types, List<ParquetValueReader<?>> readers, Schema schema) {
350-
super(types, readers);
346+
RecordReader(List<ParquetValueReader<?>> readers, Schema schema) {
347+
super(readers);
351348
this.schema = schema;
352349
}
353350

parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ public interface ParquetValueReader<T> {
3333
* instead.
3434
*/
3535
@Deprecated
36-
void setPageSource(PageReadStore pageStore, long rowPosition);
36+
default void setPageSource(PageReadStore pageStore, long rowPosition) {
37+
setPageSource(pageStore);
38+
}
3739

3840
default void setPageSource(PageReadStore pageStore) {
3941
throw new UnsupportedOperationException(

0 commit comments

Comments
 (0)