Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of nested collections #28

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,40 @@ CREATE TEMPORARY TABLE hp (
SELECT * FROM hp;
```

### On Collection Data Types with nested collections

The usage of `ARRAY`, `MULTISET`, `MAP` and `ROW` types with nested collections is shown in the following example.
To specify `expression`, `null-rate`, `length` for elements of `ARRAY` or `MULTISET` use `element` like
```sql
'fields.array-field-name.element.expression' = '#{expression}'
```
To specify `expression`, `null-rate`, `length` for elements of `MAP` use `key` and `value` like
```sql
'fields.array-field-name.key.expression' = '#{key_expression}',
'fields.array-field-name.value.expression' = '#{value_expression}'
```
a full example
```sql
CREATE TEMPORARY TABLE complex_collections (
`locations2character-with-age` MAP<ARRAY<STRING>, ROW<`name` STRING, `age` INT>>,
`multiset-house-points` MULTISET<ROW<`house` STRING, `points` INT>>
) WITH (
'connector' = 'faker',
'fields.locations2character-with-age.key.element.expression' = '#{harry_potter.location}',
'fields.locations2character-with-age.key.element.null-rate' = '0.1',
'fields.locations2character-with-age.key.null-rate' = '0.1',
'fields.locations2character-with-age.value.name.expression' = '#{harry_potter.character}',
'fields.locations2character-with-age.value.name.null-rate' = '0.2',
'fields.locations2character-with-age.value.age.expression' = '#{number.numberBetween ''10'',''100''}',
'fields.locations2character-with-age.length' = '2',
'fields.multiset-house-points.element.house.expression' = '#{harry_potter.house}',
'fields.multiset-house-points.element.points.expression' = '#{number.numberBetween ''10'',''100''}',
'fields.multiset-house-points.length' = '3'
);

SELECT * FROM complex_collections;
```

### "One Of" Columns

Datafaker allows to pick a random value from a list of options via expression ``Options.option``
Expand Down
47 changes: 28 additions & 19 deletions src/main/java/com/github/knaufk/flink/faker/FakerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;

public class FakerUtils {
Expand All @@ -32,9 +31,19 @@ public class FakerUtils {
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
.toFormatter(Locale.US);

static Object stringValueToType(String[] stringArray, LogicalType logicalType) {
String value = stringArray.length > 0 ? stringArray[0] : "";

static Object stringValueToType(
FieldInfo fieldInfo, Function<String, String> fakerExpression2Value) {
if (fieldInfo.getNullRate() == 1.0
|| fieldInfo.getNullRate() > 0
&& fieldInfo.getNullRate() > ThreadLocalRandom.current().nextFloat()) {
return null;
}
String[] fakerExpressions = fieldInfo.getExpressions();
String value =
fieldInfo.getNestedFields() == null || fieldInfo.getNestedFields().isEmpty()
? fakerExpression2Value.apply(fakerExpressions[0])
: "";
LogicalType logicalType = fieldInfo.getLogicalType();
switch (logicalType.getTypeRoot()) {
case CHAR:
case VARCHAR:
Expand Down Expand Up @@ -73,41 +82,41 @@ static Object stringValueToType(String[] stringArray, LogicalType logicalType) {
// case INTERVAL_DAY_TIME:
// break;
case ARRAY:
Object[] arrayElements = new Object[stringArray.length];
for (int i = 0; i < stringArray.length; i++)
Object[] arrayElements = new Object[fieldInfo.getLength()];
for (int i = 0; i < fieldInfo.getLength(); i++)
arrayElements[i] =
(stringValueToType(
new String[] {stringArray[i]}, ((ArrayType) logicalType).getElementType()));
fieldInfo.getNestedFields().get("element"), fakerExpression2Value));
return new GenericArrayData(arrayElements);
case MULTISET:
Map<Object, Integer> multisetMap = new HashMap<>();
for (int i = 0; i < stringArray.length; i++) {
for (int i = 0; i < fieldInfo.getLength(); i++) {
Object element =
stringValueToType(
new String[] {stringArray[i]}, ((MultisetType) logicalType).getElementType());
stringValueToType(fieldInfo.getNestedFields().get("element"), fakerExpression2Value);
Integer multiplicity =
multisetMap.containsKey(element) ? (multisetMap.get(element) + 1) : 1;
multisetMap.put(element, multiplicity);
}
return new GenericMapData(multisetMap);
case MAP:
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < stringArray.length; i += 2) {
for (int i = 0; i < fieldInfo.getLength(); i++) {
Object key =
stringValueToType(
new String[] {stringArray[i]}, ((MapType) logicalType).getKeyType());
stringValueToType(fieldInfo.getNestedFields().get("key"), fakerExpression2Value);
Object val =
stringValueToType(
new String[] {stringArray[i + 1]}, ((MapType) logicalType).getValueType());
stringValueToType(fieldInfo.getNestedFields().get("value"), fakerExpression2Value);
map.put(key, val);
}
return new GenericMapData(map);
case ROW:
GenericRowData row = new GenericRowData(stringArray.length);
GenericRowData row = new GenericRowData(((RowType) logicalType).getFieldCount());
for (int i = 0; i < ((RowType) logicalType).getFieldCount(); i++) {
Object obj =
stringValueToType(
new String[] {stringArray[i]}, ((RowType) logicalType).getTypeAt(i));
fieldInfo
.getNestedFields()
.get(((RowType) logicalType).getFieldNames().get(i) + "_" + i),
fakerExpression2Value);
row.setField(i, obj);
}
return row;
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/github/knaufk/flink/faker/FieldInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.github.knaufk.flink.faker;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import org.apache.flink.table.types.logical.LogicalType;

public class FieldInfo implements Serializable {
private final Float nullRate;
private final LogicalType logicalType;
private final String[] expressions;
private final Integer length;
private final Map<String, FieldInfo> nestedFields;

public FieldInfo(
Float nullRate,
LogicalType logicalType,
String[] expressions,
Integer length,
Map<String, FieldInfo> nestedFields) {
this.nullRate = nullRate;
this.logicalType = logicalType;
this.expressions = expressions;
this.length = length;
this.nestedFields = nestedFields;
}

public Float getNullRate() {
return nullRate;
}

public String[] getExpressions() {
return expressions;
}

public Integer getLength() {
return length;
}

public LogicalType getLogicalType() {
return logicalType;
}

public Map<String, FieldInfo> getNestedFields() {
return nestedFields;
}

@Override
public String toString() {
return "FieldInfo{"
+ "nullRate="
+ nullRate
+ ", logicalType="
+ logicalType
+ ", expressions="
+ Arrays.toString(expressions)
+ ", length="
+ length
+ ", nestedFields="
+ nestedFields
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,25 @@
package com.github.knaufk.flink.faker;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import net.datafaker.Faker;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;

class FlinkFakerGenerator extends RichFlatMapFunction<Long, RowData> {

private Faker faker;
private Random rand;

private String[][] fieldExpressions;
private Float[] fieldNullRates;
private Integer[] fieldCollectionLengths;
private LogicalType[] types;
private long rowsPerSecond;
private final FieldInfo[] fieldInfos;
private final long rowsPerSecond;
private long soFarThisSecond;
private long nextReadTime;
private long nextReadTime;;

public FlinkFakerGenerator(
String[][] fieldExpressions,
Float[] fieldNullRates,
Integer[] fieldCollectionLengths,
LogicalType[] types,
long rowsPerSecond) {
this.fieldExpressions = fieldExpressions;
this.fieldNullRates = fieldNullRates;
this.fieldCollectionLengths = fieldCollectionLengths;
this.types = types;
public FlinkFakerGenerator(FieldInfo[] fieldInfos, long rowsPerSecond) {
this.fieldInfos = fieldInfos;
this.rowsPerSecond = rowsPerSecond;
}

Expand All @@ -43,7 +28,6 @@ public void open(final Configuration parameters) throws Exception {
super.open(parameters);
faker = new Faker();
rand = new Random();

nextReadTime = System.currentTimeMillis();
soFarThisSecond = 0;
}
Expand All @@ -70,24 +54,9 @@ private void rest() throws InterruptedException {

@VisibleForTesting
RowData generateNextRow() {
GenericRowData row = new GenericRowData(fieldExpressions.length);
for (int i = 0; i < fieldExpressions.length; i++) {

float fieldNullRate = fieldNullRates[i];
if (rand.nextFloat() >= fieldNullRate) {
List<String> values = new ArrayList<String>();
for (int j = 0; j < fieldCollectionLengths[i]; j++) {
for (int k = 0; k < fieldExpressions[i].length; k++) {
// loop for multiple expressions of one field (like map, row fields)
values.add(faker.expression(fieldExpressions[i][k]));
}
}

row.setField(
i, FakerUtils.stringValueToType(values.toArray(new String[values.size()]), types[i]));
} else {
row.setField(i, null);
}
GenericRowData row = new GenericRowData(fieldInfos.length);
for (int i = 0; i < fieldInfos.length; i++) {
row.setField(i, FakerUtils.stringValueToType(fieldInfos[i], f -> faker.expression(f)));
}
return row;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,22 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.logical.LogicalType;

public class FlinkFakerLookupFunction extends TableFunction<RowData> {

private String[][] fieldExpressions;
private Float[] fieldNullRates;
private Integer[] fieldCollectionLengths;
private LogicalType[] types;
private int[][] keys;
private final FieldInfo[] fieldInfos;
private List<Integer> keyIndeces;
private Faker faker;
private Random rand;

public FlinkFakerLookupFunction(
String[][] fieldExpressions,
Float[] fieldNullRates,
Integer[] fieldCollectionLengths,
LogicalType[] types,
int[][] keys) {
this.fieldExpressions = fieldExpressions;
this.fieldNullRates = fieldNullRates;
this.fieldCollectionLengths = fieldCollectionLengths;
this.types = types;
public FlinkFakerLookupFunction(FieldInfo[] fieldInfos, int[][] keys) {
this.fieldInfos = fieldInfos;

keyIndeces = new ArrayList<>();
for (int i = 0; i < keys.length; i++) {
// we don't support nested rows for now, so this is actually one-dimensional
keyIndeces.add(keys[i][0]);
}

this.keys = keys;
}

@Override
Expand All @@ -49,27 +34,14 @@ public void open(FunctionContext context) throws Exception {
}

public void eval(Object... keys) {
GenericRowData row = new GenericRowData(fieldExpressions.length);
GenericRowData row = new GenericRowData(fieldInfos.length);
int keyCount = 0;
for (int i = 0; i < fieldExpressions.length; i++) {
for (int i = 0; i < fieldInfos.length; i++) {
if (keyIndeces.contains(i)) {
row.setField(i, keys[keyCount]);
keyCount++;
} else {
float fieldNullRate = fieldNullRates[i];
if (rand.nextFloat() > fieldNullRate) {
List<String> values = new ArrayList<>();
for (int j = 0; j < fieldCollectionLengths[i]; j++) {
for (int k = 0; k < fieldExpressions[i].length; k++) {
// loop for multiple expressions of one field (like map, row fields)
values.add(faker.expression(fieldExpressions[i][k]));
}
}
row.setField(
i, FakerUtils.stringValueToType(values.toArray(new String[values.size()]), types[i]));
} else {
row.setField(i, null);
}
row.setField(i, FakerUtils.stringValueToType(fieldInfos[i], f -> faker.expression(f)));
}
}
collect(row);
Expand Down
Loading