Skip to content

Commit

Permalink
Add support of nested collections
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed May 25, 2022
1 parent db09af1 commit b7e7012
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 210 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,40 @@ CREATE TEMPORARY TABLE hp (
SELECT * FROM hp;
```

### On Collection Data Types with nested collections

The usage of `ARRAY`, `MULTISET`, `MAP` and `ROW` types with nested collections is shown in the following example.
To specify `expression`, `null-rate`, `length` for elements of `ARRAY` or `MULTISET` use `element` like
```sql
'fields.array-field-name.element.expression' = '#{expression}'
```
To specify `expression`, `null-rate`, `length` for elements of `MAP` use `key` and `value` like
```sql
'fields.array-field-name.key.expression' = '#{key_expression}',
'fields.array-field-name.value.expression' = '#{value_expression}'
```
a full example
```sql
CREATE TEMPORARY TABLE complex_collections (
`locations2character-with-age` MAP<ARRAY<STRING>, ROW<`name` STRING, `age` INT>>,
`multiset-house-points` MULTISET<ROW<`house` STRING, `points` INT>>
) WITH (
'connector' = 'faker',
'fields.locations2character-with-age.key.element.expression' = '#{harry_potter.location}',
'fields.locations2character-with-age.key.element.null-rate' = '0.1',
'fields.locations2character-with-age.key.null-rate' = '0.1',
'fields.locations2character-with-age.value.name.expression' = '#{harry_potter.character}',
'fields.locations2character-with-age.value.name.null-rate' = '0.2',
'fields.locations2character-with-age.value.age.expression' = '#{number.numberBetween ''10'',''100''}',
'fields.locations2character-with-age.length' = '2',
'fields.multiset-house-points.element.house.expression' = '#{harry_potter.house}',
'fields.multiset-house-points.element.points.expression' = '#{number.numberBetween ''10'',''100''}',
'fields.multiset-house-points.length' = '3'
);

SELECT * FROM complex_collections;
```

### "One Of" Columns

Datafaker allows to pick a random value from a list of options via expression ``Options.option``
Expand Down
47 changes: 28 additions & 19 deletions src/main/java/com/github/knaufk/flink/faker/FakerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;

public class FakerUtils {
Expand All @@ -32,9 +31,19 @@ public class FakerUtils {
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
.toFormatter(Locale.US);

static Object stringValueToType(String[] stringArray, LogicalType logicalType) {
String value = stringArray.length > 0 ? stringArray[0] : "";

static Object stringValueToType(
FieldInfo fieldInfo, Function<String, String> fakerExpression2Value) {
if (fieldInfo.getNullRate() == 1.0
|| fieldInfo.getNullRate() > 0
&& fieldInfo.getNullRate() > ThreadLocalRandom.current().nextFloat()) {
return null;
}
String[] fakerExpressions = fieldInfo.getExpressions();
String value =
fieldInfo.getNestedFields() == null || fieldInfo.getNestedFields().isEmpty()
? fakerExpression2Value.apply(fakerExpressions[0])
: "";
LogicalType logicalType = fieldInfo.getLogicalType();
switch (logicalType.getTypeRoot()) {
case CHAR:
case VARCHAR:
Expand Down Expand Up @@ -73,41 +82,41 @@ static Object stringValueToType(String[] stringArray, LogicalType logicalType) {
// case INTERVAL_DAY_TIME:
// break;
case ARRAY:
Object[] arrayElements = new Object[stringArray.length];
for (int i = 0; i < stringArray.length; i++)
Object[] arrayElements = new Object[fieldInfo.getLength()];
for (int i = 0; i < fieldInfo.getLength(); i++)
arrayElements[i] =
(stringValueToType(
new String[] {stringArray[i]}, ((ArrayType) logicalType).getElementType()));
fieldInfo.getNestedFields().get("element"), fakerExpression2Value));
return new GenericArrayData(arrayElements);
case MULTISET:
Map<Object, Integer> multisetMap = new HashMap<>();
for (int i = 0; i < stringArray.length; i++) {
for (int i = 0; i < fieldInfo.getLength(); i++) {
Object element =
stringValueToType(
new String[] {stringArray[i]}, ((MultisetType) logicalType).getElementType());
stringValueToType(fieldInfo.getNestedFields().get("element"), fakerExpression2Value);
Integer multiplicity =
multisetMap.containsKey(element) ? (multisetMap.get(element) + 1) : 1;
multisetMap.put(element, multiplicity);
}
return new GenericMapData(multisetMap);
case MAP:
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < stringArray.length; i += 2) {
for (int i = 0; i < fieldInfo.getLength(); i++) {
Object key =
stringValueToType(
new String[] {stringArray[i]}, ((MapType) logicalType).getKeyType());
stringValueToType(fieldInfo.getNestedFields().get("key"), fakerExpression2Value);
Object val =
stringValueToType(
new String[] {stringArray[i + 1]}, ((MapType) logicalType).getValueType());
stringValueToType(fieldInfo.getNestedFields().get("value"), fakerExpression2Value);
map.put(key, val);
}
return new GenericMapData(map);
case ROW:
GenericRowData row = new GenericRowData(stringArray.length);
GenericRowData row = new GenericRowData(((RowType) logicalType).getFieldCount());
for (int i = 0; i < ((RowType) logicalType).getFieldCount(); i++) {
Object obj =
stringValueToType(
new String[] {stringArray[i]}, ((RowType) logicalType).getTypeAt(i));
fieldInfo
.getNestedFields()
.get(((RowType) logicalType).getFieldNames().get(i) + "_" + i),
fakerExpression2Value);
row.setField(i, obj);
}
return row;
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/github/knaufk/flink/faker/FieldInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.github.knaufk.flink.faker;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import org.apache.flink.table.types.logical.LogicalType;

public class FieldInfo implements Serializable {
private final Float nullRate;
private final LogicalType logicalType;
private final String[] expressions;
private final Integer length;
private final Map<String, FieldInfo> nestedFields;

public FieldInfo(
Float nullRate,
LogicalType logicalType,
String[] expressions,
Integer length,
Map<String, FieldInfo> nestedFields) {
this.nullRate = nullRate;
this.logicalType = logicalType;
this.expressions = expressions;
this.length = length;
this.nestedFields = nestedFields;
}

public Float getNullRate() {
return nullRate;
}

public String[] getExpressions() {
return expressions;
}

public Integer getLength() {
return length;
}

public LogicalType getLogicalType() {
return logicalType;
}

public Map<String, FieldInfo> getNestedFields() {
return nestedFields;
}

@Override
public String toString() {
return "FieldInfo{"
+ "nullRate="
+ nullRate
+ ", logicalType="
+ logicalType
+ ", expressions="
+ Arrays.toString(expressions)
+ ", length="
+ length
+ ", nestedFields="
+ nestedFields
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,25 @@
package com.github.knaufk.flink.faker;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import net.datafaker.Faker;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;

class FlinkFakerGenerator extends RichFlatMapFunction<Long, RowData> {

private Faker faker;
private Random rand;

private String[][] fieldExpressions;
private Float[] fieldNullRates;
private Integer[] fieldCollectionLengths;
private LogicalType[] types;
private long rowsPerSecond;
private final FieldInfo[] fieldInfos;
private final long rowsPerSecond;
private long soFarThisSecond;
private long nextReadTime;
private long nextReadTime;;

public FlinkFakerGenerator(
String[][] fieldExpressions,
Float[] fieldNullRates,
Integer[] fieldCollectionLengths,
LogicalType[] types,
long rowsPerSecond) {
this.fieldExpressions = fieldExpressions;
this.fieldNullRates = fieldNullRates;
this.fieldCollectionLengths = fieldCollectionLengths;
this.types = types;
public FlinkFakerGenerator(FieldInfo[] fieldInfos, long rowsPerSecond) {
this.fieldInfos = fieldInfos;
this.rowsPerSecond = rowsPerSecond;
}

Expand All @@ -43,7 +28,6 @@ public void open(final Configuration parameters) throws Exception {
super.open(parameters);
faker = new Faker();
rand = new Random();

nextReadTime = System.currentTimeMillis();
soFarThisSecond = 0;
}
Expand All @@ -70,24 +54,9 @@ private void rest() throws InterruptedException {

@VisibleForTesting
RowData generateNextRow() {
GenericRowData row = new GenericRowData(fieldExpressions.length);
for (int i = 0; i < fieldExpressions.length; i++) {

float fieldNullRate = fieldNullRates[i];
if (rand.nextFloat() >= fieldNullRate) {
List<String> values = new ArrayList<String>();
for (int j = 0; j < fieldCollectionLengths[i]; j++) {
for (int k = 0; k < fieldExpressions[i].length; k++) {
// loop for multiple expressions of one field (like map, row fields)
values.add(faker.expression(fieldExpressions[i][k]));
}
}

row.setField(
i, FakerUtils.stringValueToType(values.toArray(new String[values.size()]), types[i]));
} else {
row.setField(i, null);
}
GenericRowData row = new GenericRowData(fieldInfos.length);
for (int i = 0; i < fieldInfos.length; i++) {
row.setField(i, FakerUtils.stringValueToType(fieldInfos[i], f -> faker.expression(f)));
}
return row;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,22 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.logical.LogicalType;

public class FlinkFakerLookupFunction extends TableFunction<RowData> {

private String[][] fieldExpressions;
private Float[] fieldNullRates;
private Integer[] fieldCollectionLengths;
private LogicalType[] types;
private int[][] keys;
private final FieldInfo[] fieldInfos;
private List<Integer> keyIndeces;
private Faker faker;
private Random rand;

public FlinkFakerLookupFunction(
String[][] fieldExpressions,
Float[] fieldNullRates,
Integer[] fieldCollectionLengths,
LogicalType[] types,
int[][] keys) {
this.fieldExpressions = fieldExpressions;
this.fieldNullRates = fieldNullRates;
this.fieldCollectionLengths = fieldCollectionLengths;
this.types = types;
public FlinkFakerLookupFunction(FieldInfo[] fieldInfos, int[][] keys) {
this.fieldInfos = fieldInfos;

keyIndeces = new ArrayList<>();
for (int i = 0; i < keys.length; i++) {
// we don't support nested rows for now, so this is actually one-dimensional
keyIndeces.add(keys[i][0]);
}

this.keys = keys;
}

@Override
Expand All @@ -49,27 +34,14 @@ public void open(FunctionContext context) throws Exception {
}

public void eval(Object... keys) {
GenericRowData row = new GenericRowData(fieldExpressions.length);
GenericRowData row = new GenericRowData(fieldInfos.length);
int keyCount = 0;
for (int i = 0; i < fieldExpressions.length; i++) {
for (int i = 0; i < fieldInfos.length; i++) {
if (keyIndeces.contains(i)) {
row.setField(i, keys[keyCount]);
keyCount++;
} else {
float fieldNullRate = fieldNullRates[i];
if (rand.nextFloat() > fieldNullRate) {
List<String> values = new ArrayList<>();
for (int j = 0; j < fieldCollectionLengths[i]; j++) {
for (int k = 0; k < fieldExpressions[i].length; k++) {
// loop for multiple expressions of one field (like map, row fields)
values.add(faker.expression(fieldExpressions[i][k]));
}
}
row.setField(
i, FakerUtils.stringValueToType(values.toArray(new String[values.size()]), types[i]));
} else {
row.setField(i, null);
}
row.setField(i, FakerUtils.stringValueToType(fieldInfos[i], f -> faker.expression(f)));
}
}
collect(row);
Expand Down
Loading

0 comments on commit b7e7012

Please sign in to comment.