Skip to content

Commit

Permalink
Chore/serde restructure (#85)
Browse files Browse the repository at this point in the history
* refactor: move protohandler codebase to common and use new source sink APIs
  • Loading branch information
arujit committed Feb 2, 2022
1 parent 04085a9 commit 1b3c688
Show file tree
Hide file tree
Showing 102 changed files with 972 additions and 1,147 deletions.
4 changes: 3 additions & 1 deletion dagger-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ dependencies {
compileOnly group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: flinkVersion

dependenciesCommonJar 'org.apache.flink:flink-metrics-dropwizard:' + flinkVersion
dependenciesCommonJar 'org.apache.flink:flink-json:' + flinkVersion
dependenciesCommonJar 'com.jayway.jsonpath:json-path:2.4.0'
dependenciesCommonJar 'com.gojek:stencil:2.0.15'

testImplementation 'junit:junit:4.12'
testImplementation 'junit:junit:4.13'
testImplementation 'org.jmockit:jmockit:1.25'
testImplementation 'org.mockito:mockito-core:2.25.1'
testImplementation 'io.grpc:grpc-protobuf:1.18.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.exception;
package io.odpf.dagger.common.exceptions.serde;

/**
* The class Exception for unsupported protobuf data type.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.exception;
package io.odpf.dagger.common.exceptions.serde;

/**
* The class Exception if Enum field not found in proto descriptor.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.exception;
package io.odpf.dagger.common.exceptions.serde;

/**
* The class Exception if there is an Invalid Data type.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;

import io.odpf.dagger.core.exception.EnumFieldNotFoundException;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.odpf.dagger.common.exceptions.serde.EnumFieldNotFoundException;

/**
* The type Enum proto handler.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import io.odpf.dagger.core.exception.InvalidDataTypeException;
import io.odpf.dagger.core.protohandler.typehandler.PrimitiveTypeHandler;
import io.odpf.dagger.core.protohandler.typehandler.PrimitiveTypeHandlerFactory;
import io.odpf.dagger.common.exceptions.serde.InvalidDataTypeException;
import io.odpf.dagger.common.protohandler.typehandler.PrimitiveTypeHandlerFactory;
import io.odpf.dagger.common.protohandler.typehandler.PrimitiveTypeHandler;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import com.google.protobuf.Descriptors;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import io.odpf.dagger.core.protohandler.typehandler.PrimitiveTypeHandler;
import io.odpf.dagger.core.protohandler.typehandler.PrimitiveTypeHandlerFactory;
import io.odpf.dagger.common.protohandler.typehandler.PrimitiveTypeHandlerFactory;
import io.odpf.dagger.common.protohandler.typehandler.PrimitiveTypeHandler;
import com.google.gson.Gson;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
Expand All @@ -9,6 +9,8 @@

import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeParseException;
import java.util.TimeZone;

Expand Down Expand Up @@ -44,6 +46,11 @@ public DynamicMessage.Builder transformForKafka(DynamicMessage.Builder builder,
if (field instanceof java.sql.Timestamp) {
timestamp = convertSqlTimestamp((java.sql.Timestamp) field);
}

if (field instanceof LocalDateTime) {
timestamp = convertLocalDateTime((LocalDateTime) field);
}

if (field instanceof Row) {
Row timeField = (Row) field;
if (timeField.getArity() == 2) {
Expand All @@ -70,6 +77,12 @@ public DynamicMessage.Builder transformForKafka(DynamicMessage.Builder builder,
return builder;
}

private Timestamp convertLocalDateTime(LocalDateTime timeField) {
return Timestamp.newBuilder()
.setSeconds(timeField.toEpochSecond(ZoneOffset.UTC))
.build();
}

@Override
public Object transformFromPostProcessor(Object field) {
return isValid(field) ? field.toString() : null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
import com.google.protobuf.Descriptors;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import com.google.common.primitives.Booleans;
import com.google.protobuf.Descriptors;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import com.google.common.primitives.Doubles;
import com.google.protobuf.Descriptors;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import com.google.common.primitives.Floats;
import com.google.protobuf.Descriptors;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import com.google.common.primitives.Ints;
import com.google.protobuf.Descriptors;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import io.odpf.dagger.core.exception.DataTypeNotSupportedException;
import io.odpf.dagger.common.exceptions.serde.DataTypeNotSupportedException;
import com.google.protobuf.Descriptors;

import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.odpf.dagger.core.protohandler.typehandler;
package io.odpf.dagger.common.protohandler.typehandler;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.odpf.dagger.common.watermark;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.types.Row;

public class NoWatermark implements WatermarkStrategyDefinition {
@Override
public WatermarkStrategy<Row> getWatermarkStrategy(long waterMarkDelayInMs) {
return WatermarkStrategy.noWatermarks();
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.odpf.dagger.common.watermark;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.types.Row;

import java.io.Serializable;
Expand All @@ -23,9 +21,5 @@ public DataStream<Row> assignTimeStampAndWatermark(DataStream<Row> inputStream,
return inputStream
.assignTimestampsAndWatermarks(watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelayMs));
}

public FlinkKafkaConsumerBase consumerAssignTimeStampAndWatermark(FlinkKafkaConsumer<Row> flinkKafkaConsumer, long watermarkDelayMs, boolean enablePerPartitionWatermark) {
return enablePerPartitionWatermark ? flinkKafkaConsumer
.assignTimestampsAndWatermarks(watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelayMs)) : flinkKafkaConsumer;
}
}

Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.odpf.dagger.consumer.TestBookingLogMessage;
import io.odpf.dagger.consumer.TestRepeatedEnumMessage;
import io.odpf.dagger.consumer.TestServiceType;
import io.odpf.dagger.core.exception.EnumFieldNotFoundException;
import io.odpf.dagger.common.exceptions.serde.EnumFieldNotFoundException;
import org.apache.flink.api.common.typeinfo.Types;

import org.junit.Assert;
import org.junit.Test;

import static org.junit.Assert.*;
Expand Down Expand Up @@ -71,7 +73,7 @@ public void shouldThrowExceptionIfFieldNotFoundInGivenEnumFieldTypeDescriptor()
EnumProtoHandler enumProtoHandler = new EnumProtoHandler(enumFieldDescriptor);
DynamicMessage.Builder builder = DynamicMessage.newBuilder(enumFieldDescriptor.getContainingType());

EnumFieldNotFoundException exception = assertThrows(EnumFieldNotFoundException.class, () -> enumProtoHandler.transformForKafka(builder, "test"));
EnumFieldNotFoundException exception = Assert.assertThrows(EnumFieldNotFoundException.class, () -> enumProtoHandler.transformForKafka(builder, "test"));
assertEquals("field: test not found in io.odpf.dagger.consumer.TestBookingLogMessage.service_type", exception.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import com.google.protobuf.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MapEntry;
import com.google.protobuf.WireFormat;
import io.odpf.dagger.consumer.TestBookingLogMessage;
import io.odpf.dagger.consumer.TestComplexMap;
import io.odpf.dagger.consumer.TestMessage;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class MapProtoHandlerTest {

Expand Down Expand Up @@ -111,7 +123,7 @@ public void shouldThrowExceptionIfRowsPassedAreNotOfArityTwo() {

Row inputRow = new Row(3);
inputRows.add(inputRow);
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
IllegalArgumentException exception = Assert.assertThrows(IllegalArgumentException.class,
() -> mapProtoHandler.transformForKafka(builder, inputRows.toArray()));
assertEquals("Row: +I[null, null, null] of size: 3 cannot be converted to map", exception.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import io.odpf.dagger.consumer.TestBookingLogMessage;
import io.odpf.dagger.consumer.TestPaymentOptionMetadata;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.junit.Test;

import java.util.HashMap;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class MessageProtoHandlerTest {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.odpf.dagger.core.protohandler;
package io.odpf.dagger.common.protohandler;

import org.apache.flink.api.common.typeinfo.Types;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.odpf.dagger.common.exceptions.serde.InvalidDataTypeException;
import io.odpf.dagger.consumer.TestBookingLogMessage;
import io.odpf.dagger.core.exception.InvalidDataTypeException;
import org.apache.flink.api.common.typeinfo.Types;
import org.junit.Assert;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class PrimitiveProtoHandlerTest {

Expand Down Expand Up @@ -80,7 +83,7 @@ public void shouldThrowInvalidDataTypeExceptionInCaseOfTypeMismatchForPostProces
Descriptors.FieldDescriptor floatFieldDescriptor = TestBookingLogMessage.getDescriptor().findFieldByName("total_customer_discount");
PrimitiveProtoHandler primitiveProtoHandler = new PrimitiveProtoHandler(floatFieldDescriptor);

InvalidDataTypeException exception = assertThrows(InvalidDataTypeException.class,
InvalidDataTypeException exception = Assert.assertThrows(InvalidDataTypeException.class,
() -> primitiveProtoHandler.transformFromPostProcessor("stringValue"));
assertEquals("type mismatch of field: total_customer_discount, expecting FLOAT type, actual type class java.lang.String", exception.getMessage());
}
Expand Down
Loading

0 comments on commit 1b3c688

Please sign in to comment.