Skip to content

Commit a100e6a

Browse files
authored
Avro: Add writers for the internal object model (apache#11919)
1 parent 97b5b39 commit a100e6a

File tree

10 files changed

+570
-129
lines changed

10 files changed

+570
-129
lines changed

api/src/test/java/org/apache/iceberg/util/RandomUtil.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@
2121
import java.math.BigDecimal;
2222
import java.math.BigInteger;
2323
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.Map;
2426
import java.util.Random;
27+
import java.util.Set;
28+
import java.util.function.Supplier;
29+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
30+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
31+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
2532
import org.apache.iceberg.types.Type;
2633
import org.apache.iceberg.types.Types;
2734

@@ -228,4 +235,54 @@ private static BigInteger randomUnscaled(int precision, Random random) {
228235

229236
return new BigInteger(sb.toString());
230237
}
238+
239+
public static List<Object> generateList(
240+
Random random, Types.ListType list, Supplier<Object> elementResult) {
241+
int numElements = random.nextInt(20);
242+
243+
List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
244+
for (int i = 0; i < numElements; i += 1) {
245+
// return null 5% of the time when the value is optional
246+
if (list.isElementOptional() && random.nextInt(20) == 1) {
247+
result.add(null);
248+
} else {
249+
result.add(elementResult.get());
250+
}
251+
}
252+
253+
return result;
254+
}
255+
256+
public static Map<Object, Object> generateMap(
257+
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
258+
int numEntries = random.nextInt(20);
259+
260+
Map<Object, Object> result = Maps.newLinkedHashMap();
261+
Supplier<Object> keyFunc;
262+
if (map.keyType() == Types.StringType.get()) {
263+
keyFunc = () -> keyResult.get().toString();
264+
} else {
265+
keyFunc = keyResult;
266+
}
267+
268+
Set<Object> keySet = Sets.newHashSet();
269+
for (int i = 0; i < numEntries; i += 1) {
270+
Object key = keyFunc.get();
271+
// ensure no collisions
272+
while (keySet.contains(key)) {
273+
key = keyFunc.get();
274+
}
275+
276+
keySet.add(key);
277+
278+
// return null 5% of the time when the value is optional
279+
if (map.isValueOptional() && random.nextInt(20) == 1) {
280+
result.put(key, null);
281+
} else {
282+
result.put(key, valueResult.get());
283+
}
284+
}
285+
286+
return result;
287+
}
231288
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.avro;
20+
21+
import java.util.List;
22+
import org.apache.avro.LogicalType;
23+
import org.apache.avro.LogicalTypes;
24+
import org.apache.avro.Schema;
25+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
26+
27+
abstract class BaseWriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
28+
29+
protected abstract ValueWriter<?> createRecordWriter(List<ValueWriter<?>> fields);
30+
31+
protected abstract ValueWriter<?> fixedWriter(int length);
32+
33+
@Override
34+
public ValueWriter<?> record(Schema record, List<String> names, List<ValueWriter<?>> fields) {
35+
return createRecordWriter(fields);
36+
}
37+
38+
@Override
39+
public ValueWriter<?> union(Schema union, List<ValueWriter<?>> options) {
40+
Preconditions.checkArgument(
41+
options.size() == 2, "Cannot create writer for non-option union: %s", union);
42+
if (union.getTypes().get(0).getType() == Schema.Type.NULL) {
43+
return ValueWriters.option(0, options.get(1));
44+
} else if (union.getTypes().get(1).getType() == Schema.Type.NULL) {
45+
return ValueWriters.option(1, options.get(0));
46+
} else {
47+
throw new IllegalArgumentException(
48+
String.format("Cannot create writer for non-option union: %s", union));
49+
}
50+
}
51+
52+
@Override
53+
public ValueWriter<?> array(Schema array, ValueWriter<?> elementWriter) {
54+
if (array.getLogicalType() instanceof LogicalMap) {
55+
ValueWriters.StructWriter<?> keyValueWriter = (ValueWriters.StructWriter<?>) elementWriter;
56+
return ValueWriters.arrayMap(keyValueWriter.writer(0), keyValueWriter.writer(1));
57+
}
58+
59+
return ValueWriters.array(elementWriter);
60+
}
61+
62+
@Override
63+
public ValueWriter<?> map(Schema map, ValueWriter<?> valueWriter) {
64+
return ValueWriters.map(ValueWriters.strings(), valueWriter);
65+
}
66+
67+
@Override
68+
public ValueWriter<?> primitive(Schema primitive) {
69+
LogicalType logicalType = primitive.getLogicalType();
70+
if (logicalType != null) {
71+
switch (logicalType.getName()) {
72+
case "date":
73+
return ValueWriters.ints();
74+
75+
case "time-micros":
76+
return ValueWriters.longs();
77+
78+
case "timestamp-micros":
79+
return ValueWriters.longs();
80+
81+
case "decimal":
82+
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
83+
return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale());
84+
85+
case "uuid":
86+
return ValueWriters.uuids();
87+
88+
default:
89+
throw new IllegalArgumentException("Unsupported logical type: " + logicalType);
90+
}
91+
}
92+
93+
switch (primitive.getType()) {
94+
case NULL:
95+
return ValueWriters.nulls();
96+
case BOOLEAN:
97+
return ValueWriters.booleans();
98+
case INT:
99+
return ValueWriters.ints();
100+
case LONG:
101+
return ValueWriters.longs();
102+
case FLOAT:
103+
return ValueWriters.floats();
104+
case DOUBLE:
105+
return ValueWriters.doubles();
106+
case STRING:
107+
return ValueWriters.strings();
108+
case FIXED:
109+
return fixedWriter(primitive.getFixedSize());
110+
case BYTES:
111+
return ValueWriters.byteBuffers();
112+
default:
113+
throw new IllegalArgumentException("Unsupported type: " + primitive);
114+
}
115+
}
116+
}

core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java

Lines changed: 4 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,9 @@
2121
import java.io.IOException;
2222
import java.util.List;
2323
import java.util.stream.Stream;
24-
import org.apache.avro.LogicalType;
25-
import org.apache.avro.LogicalTypes;
2624
import org.apache.avro.Schema;
2725
import org.apache.avro.io.Encoder;
2826
import org.apache.iceberg.FieldMetrics;
29-
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3027

3128
public class GenericAvroWriter<T> implements MetricsAwareDatumWriter<T> {
3229
private ValueWriter<T> writer = null;
@@ -55,92 +52,16 @@ public Stream<FieldMetrics> metrics() {
5552
return writer.metrics();
5653
}
5754

58-
private static class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
59-
private WriteBuilder() {}
55+
private static class WriteBuilder extends BaseWriteBuilder {
6056

6157
@Override
62-
public ValueWriter<?> record(Schema record, List<String> names, List<ValueWriter<?>> fields) {
58+
protected ValueWriter<?> createRecordWriter(List<ValueWriter<?>> fields) {
6359
return ValueWriters.record(fields);
6460
}
6561

6662
@Override
67-
public ValueWriter<?> union(Schema union, List<ValueWriter<?>> options) {
68-
Preconditions.checkArgument(
69-
options.contains(ValueWriters.nulls()),
70-
"Cannot create writer for non-option union: %s",
71-
union);
72-
Preconditions.checkArgument(
73-
options.size() == 2, "Cannot create writer for non-option union: %s", union);
74-
if (union.getTypes().get(0).getType() == Schema.Type.NULL) {
75-
return ValueWriters.option(0, options.get(1));
76-
} else {
77-
return ValueWriters.option(1, options.get(0));
78-
}
79-
}
80-
81-
@Override
82-
public ValueWriter<?> array(Schema array, ValueWriter<?> elementWriter) {
83-
if (array.getLogicalType() instanceof LogicalMap) {
84-
ValueWriters.StructWriter<?> keyValueWriter = (ValueWriters.StructWriter<?>) elementWriter;
85-
return ValueWriters.arrayMap(keyValueWriter.writer(0), keyValueWriter.writer(1));
86-
}
87-
88-
return ValueWriters.array(elementWriter);
89-
}
90-
91-
@Override
92-
public ValueWriter<?> map(Schema map, ValueWriter<?> valueWriter) {
93-
return ValueWriters.map(ValueWriters.strings(), valueWriter);
94-
}
95-
96-
@Override
97-
public ValueWriter<?> primitive(Schema primitive) {
98-
LogicalType logicalType = primitive.getLogicalType();
99-
if (logicalType != null) {
100-
switch (logicalType.getName()) {
101-
case "date":
102-
return ValueWriters.ints();
103-
104-
case "time-micros":
105-
return ValueWriters.longs();
106-
107-
case "timestamp-micros":
108-
return ValueWriters.longs();
109-
110-
case "decimal":
111-
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
112-
return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale());
113-
114-
case "uuid":
115-
return ValueWriters.uuids();
116-
117-
default:
118-
throw new IllegalArgumentException("Unsupported logical type: " + logicalType);
119-
}
120-
}
121-
122-
switch (primitive.getType()) {
123-
case NULL:
124-
return ValueWriters.nulls();
125-
case BOOLEAN:
126-
return ValueWriters.booleans();
127-
case INT:
128-
return ValueWriters.ints();
129-
case LONG:
130-
return ValueWriters.longs();
131-
case FLOAT:
132-
return ValueWriters.floats();
133-
case DOUBLE:
134-
return ValueWriters.doubles();
135-
case STRING:
136-
return ValueWriters.strings();
137-
case FIXED:
138-
return ValueWriters.genericFixed(primitive.getFixedSize());
139-
case BYTES:
140-
return ValueWriters.byteBuffers();
141-
default:
142-
throw new IllegalArgumentException("Unsupported type: " + primitive);
143-
}
63+
protected ValueWriter<?> fixedWriter(int length) {
64+
return ValueWriters.genericFixed(length);
14465
}
14566
}
14667
}

core/src/main/java/org/apache/iceberg/avro/InternalReader.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ public ValueReader<?> primitive(Pair<Integer, Type> partner, Schema primitive) {
205205
case STRING:
206206
return ValueReaders.strings();
207207
case FIXED:
208-
return ValueReaders.fixed(primitive);
209208
case BYTES:
210209
return ValueReaders.byteBuffers();
211210
case ENUM:
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.avro;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
import java.util.stream.Stream;
24+
import org.apache.avro.Schema;
25+
import org.apache.avro.io.Encoder;
26+
import org.apache.iceberg.FieldMetrics;
27+
import org.apache.iceberg.types.Type;
28+
29+
/**
30+
* A Writer that consumes Iceberg's internal in-memory object model.
31+
*
32+
* <p>Iceberg's internal in-memory object model produces the types defined in {@link
33+
* Type.TypeID#javaClass()}.
34+
*/
35+
public class InternalWriter<T> implements MetricsAwareDatumWriter<T> {
36+
private ValueWriter<T> writer = null;
37+
38+
public static <D> InternalWriter<D> create(Schema schema) {
39+
return new InternalWriter<>(schema);
40+
}
41+
42+
InternalWriter(Schema schema) {
43+
setSchema(schema);
44+
}
45+
46+
@Override
47+
@SuppressWarnings("unchecked")
48+
public void setSchema(Schema schema) {
49+
this.writer = (ValueWriter<T>) AvroSchemaVisitor.visit(schema, new WriteBuilder());
50+
}
51+
52+
@Override
53+
public void write(T datum, Encoder out) throws IOException {
54+
writer.write(datum, out);
55+
}
56+
57+
@Override
58+
public Stream<FieldMetrics> metrics() {
59+
return writer.metrics();
60+
}
61+
62+
private static class WriteBuilder extends BaseWriteBuilder {
63+
64+
@Override
65+
protected ValueWriter<?> createRecordWriter(List<ValueWriter<?>> fields) {
66+
return ValueWriters.struct(fields);
67+
}
68+
69+
@Override
70+
protected ValueWriter<?> fixedWriter(int length) {
71+
return ValueWriters.fixedBuffers(length);
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)