Skip to content

Commit

Permalink
feat: Bigquery sink using depot (#154) (#185)
Browse files Browse the repository at this point in the history
* feat: Bigquery sink using depot (#154)

* chore: fix checkstyle

* feat: Checkpointing on bq sink (#187)

* feat: prepare for commit

* fix: clear the messages for pushing to bq

* docs: add documentation for BQ sink in Dagger

- [#188]

* chore: version bump of depot

Co-authored-by: Sumit Aich <[email protected]>
  • Loading branch information
lavkesh and sumitaich1998 authored Sep 19, 2022
1 parent 6133f7f commit ba209e2
Show file tree
Hide file tree
Showing 35 changed files with 960 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ public class Constants {
public static final boolean SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT = false;
public static final String SCHEMA_REGISTRY_STENCIL_URLS_KEY = "SCHEMA_REGISTRY_STENCIL_URLS";
public static final String SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT = "";
public static final String SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY = "SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS";
public static final Integer SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT = 60000;
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS = "SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS";
public static final Integer SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT = 60000;
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS";
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_DEFAULT = "";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ public StencilClientOrchestrator(Configuration configuration) {
}

StencilConfig createStencilConfig() {
Integer timeoutMS = configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
List<Header> headers = this.getHeaders(configuration);
return StencilConfig.builder().fetchTimeoutMs(timeoutMS).fetchHeaders(headers).build();
return StencilConfig.builder()
.fetchHeaders(getHeaders(configuration))
.fetchTimeoutMs(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT))
.build();
}

private List<Header> getHeaders(Configuration config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package io.odpf.dagger.common.serde.proto.deserialization;

import io.odpf.dagger.common.serde.DaggerDeserializer;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.types.Row;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
import io.odpf.dagger.common.exceptions.serde.DaggerDeserializationException;
import io.odpf.dagger.common.serde.DaggerDeserializer;
import io.odpf.dagger.common.serde.typehandler.RowFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,11 +24,11 @@
*/
public class ProtoDeserializer implements KafkaDeserializationSchema<Row>, DaggerDeserializer<Row> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtoDeserializer.class);
private final String protoClassName;
private final int timestampFieldIndex;
private final StencilClientOrchestrator stencilClientOrchestrator;
private final TypeInformation<Row> typeInformation;
private static final Logger LOGGER = LoggerFactory.getLogger(ProtoDeserializer.class);

/**
* Instantiates a new Proto deserializer.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.odpf.dagger.common.serde.proto.serialization;

import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.types.Row;

import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

public class KafkaProtoSerializer implements KafkaRecordSerializationSchema<Row> {
private final String outputTopic;
private final ProtoSerializer protoSerializer;
private static final Logger LOGGER = LoggerFactory.getLogger("KafkaSink");

public KafkaProtoSerializer(ProtoSerializer protoSerializer) {
this(protoSerializer, "");
}

public KafkaProtoSerializer(ProtoSerializer protoSerializer, String outputTopic) {
this.protoSerializer = protoSerializer;
this.outputTopic = outputTopic;
}

@Override
public void open(InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
KafkaRecordSerializationSchema.super.open(context, sinkContext);
}

@Override
public ProducerRecord<byte[], byte[]> serialize(Row row, KafkaSinkContext context, Long timestamp) {
if (Objects.isNull(outputTopic) || outputTopic.equals("")) {
throw new DaggerSerializationException("outputTopic is required");
}
LOGGER.info("row to kafka: " + row);
byte[] key = protoSerializer.serializeKey(row);
byte[] message = protoSerializer.serializeValue(row);
return new ProducerRecord<>(outputTopic, key, message);
}
}
Original file line number Diff line number Diff line change
@@ -1,78 +1,38 @@
package io.odpf.dagger.common.serde.proto.serialization;

import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.types.Row;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException;
import io.odpf.dagger.common.exceptions.serde.InvalidColumnMappingException;
import org.apache.flink.types.Row;
import io.odpf.dagger.common.serde.typehandler.TypeHandler;
import io.odpf.dagger.common.serde.typehandler.TypeHandlerFactory;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;

public class ProtoSerializer implements KafkaRecordSerializationSchema<Row> {
private String[] columnNames;
private StencilClientOrchestrator stencilClientOrchestrator;
private String keyProtoClassName;
private String messageProtoClassName;
private String outputTopic;
private static final Logger LOGGER = LoggerFactory.getLogger("KafkaSink");
public class ProtoSerializer implements Serializable {

private final String keyProtoClassName;
private final String[] columnNames;
private final StencilClientOrchestrator stencilClientOrchestrator;
private final String messageProtoClassName;

/**
* Instantiates a new Proto serializer with specified output topic name.
*
* @param keyProtoClassName the key proto class name
* @param messageProtoClassName the message proto class name
* @param columnNames the column names
* @param stencilClientOrchestrator the stencil client orchestrator
*/
public ProtoSerializer(String keyProtoClassName, String messageProtoClassName, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator) {
if (Objects.isNull(messageProtoClassName)) {
throw new DaggerSerializationException("messageProtoClassName is required");
}
this.keyProtoClassName = keyProtoClassName;
this.messageProtoClassName = messageProtoClassName;
this.columnNames = columnNames;
this.stencilClientOrchestrator = stencilClientOrchestrator;
this.messageProtoClassName = messageProtoClassName;
checkValidity();
}

/**
* Instantiates a new Proto serializer with specified output topic name.
*
* @param keyProtoClassName the key proto class name
* @param messageProtoClassName the message proto class name
* @param columnNames the column names
* @param stencilClientOrchestrator the stencil client orchestrator
* @param outputTopic the output topic
*/
public ProtoSerializer(String keyProtoClassName, String messageProtoClassName, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, String outputTopic) {
this(keyProtoClassName, messageProtoClassName, columnNames, stencilClientOrchestrator);
this.outputTopic = outputTopic;
}

@Override
public void open(InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
KafkaRecordSerializationSchema.super.open(context, sinkContext);
}

@Override
public ProducerRecord<byte[], byte[]> serialize(Row row, KafkaSinkContext context, Long timestamp) {
if (Objects.isNull(outputTopic) || outputTopic.equals("")) {
throw new DaggerSerializationException("outputTopic is required");
private void checkValidity() {
if (Objects.isNull(messageProtoClassName) || messageProtoClassName.isEmpty()) {
throw new DaggerSerializationException("messageProtoClassName is required");
}
LOGGER.info("row to kafka: " + row);
byte[] key = serializeKey(row);
byte[] message = serializeValue(row);
return new ProducerRecord<>(outputTopic, key, message);
}

/**
Expand All @@ -82,16 +42,10 @@ public ProducerRecord<byte[], byte[]> serialize(Row row, KafkaSinkContext contex
* @return the byte [ ]
*/
public byte[] serializeKey(Row row) {
return (Objects.isNull(keyProtoClassName) || keyProtoClassName.equals("")) ? null
return (Objects.isNull(keyProtoClassName) || keyProtoClassName.isEmpty()) ? null
: parse(row, getDescriptor(keyProtoClassName)).toByteArray();
}

/**
* Serialize value message.
*
* @param row the row
* @return the byte [ ]
*/
public byte[] serializeValue(Row row) {
return parse(row, getDescriptor(messageProtoClassName)).toByteArray();
}
Expand All @@ -117,6 +71,14 @@ private DynamicMessage parse(Row element, Descriptors.Descriptor descriptor) {
return builder.build();
}

private Descriptors.Descriptor getDescriptor(String className) {
Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(className);
if (dsc == null) {
throw new DescriptorNotFoundException();
}
return dsc;
}

private DynamicMessage.Builder populateNestedBuilder(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames, DynamicMessage.Builder parentBuilder, Object data) {
String childColumnName = nestedColumnNames[0];
Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName);
Expand Down Expand Up @@ -153,12 +115,4 @@ private DynamicMessage.Builder populateBuilder(DynamicMessage.Builder builder, D

return builder;
}

private Descriptors.Descriptor getDescriptor(String className) {
Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(className);
if (dsc == null) {
throw new DescriptorNotFoundException();
}
return dsc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private Configuration getConfig(Map<String, String> mapConfig) {
public void shouldReturnClassLoadStencilClientIfStencilDisabled() throws NoSuchFieldException, IllegalAccessException {
when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT);
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT);
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
stencilClient = stencilClientOrchestrator.getStencilClient();

Expand All @@ -53,7 +53,7 @@ public void shouldReturnMultiURLStencilClient() throws NoSuchFieldException, Ill
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest,"
+ "http://localhost/events/latest,"
+ "http://localhost/entities/release");
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
stencilClient = stencilClientOrchestrator.getStencilClient();

Expand All @@ -66,7 +66,7 @@ public void shouldReturnMultiURLStencilClient() throws NoSuchFieldException, Ill
@Test
public void shouldEnrichStencilClient() throws NoSuchFieldException, IllegalAccessException {
when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(true);
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest,");
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
StencilClient oldStencilClient = stencilClientOrchestrator.getStencilClient();
Expand All @@ -93,7 +93,7 @@ public void shouldEnrichStencilClient() throws NoSuchFieldException, IllegalAcce
public void shouldNotEnrichIfNoNewAdditionalURLsAdded() throws NoSuchFieldException, IllegalAccessException {
when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(true);
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest,");
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
StencilClient oldStencilClient = stencilClientOrchestrator.getStencilClient();

Expand All @@ -118,7 +118,7 @@ public void shouldNotEnrichIfNoNewAdditionalURLsAdded() throws NoSuchFieldExcept
public void shouldReturnClassLoadStencilClientWhenStencilDisabledAndEnrichmentStencilUrlsIsNotNull() throws NoSuchFieldException, IllegalAccessException {
when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT);
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT);
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);

List<String> enrichmentStencilURLs = Collections
Expand All @@ -138,13 +138,13 @@ public void shouldReturnDefaultTimeoutIfTimeoutMsConfigNotSet() {
Configuration config = getConfig(configMap);
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(config);
StencilConfig stencilConfig = stencilClientOrchestrator.createStencilConfig();
assertEquals(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT, stencilConfig.getFetchTimeoutMs());
assertEquals(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT, stencilConfig.getFetchTimeoutMs());
}

@Test
public void shouldReturnConfiguredTimeoutIfTimeoutMsConfigIsSet() {
Map<String, String> configMap = new HashMap<String, String>() {{
put(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, "8000");
put(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, "8000");
}};
Configuration config = getConfig(configMap);
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.odpf.dagger.common.serde.proto.serialization;

import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

public class KafkaProtoSerializerTest {

@Test
public void shouldSerializeIntoKafkaRecord() {
ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class);
String outputTopic = "test";
Row element = new Row(1);
element.setField(0, "testing");
byte[] keyBytes = "key".getBytes();
byte[] valueBytes = "value".getBytes();
Mockito.when(serializer.serializeKey(element)).thenReturn(keyBytes);
Mockito.when(serializer.serializeValue(element)).thenReturn(valueBytes);
KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, outputTopic);
ProducerRecord<byte[], byte[]> record = kafkaProtoSerializer.serialize(element, null, null);
ProducerRecord<byte[], byte[]> expectedRecord = new ProducerRecord<>("test", keyBytes, valueBytes);
Assert.assertEquals(expectedRecord, record);
}

@Test
public void shouldThrowExceptionWhenOutputTopicIsNullForSerializeMethod() {
ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class);
KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, null);
Row element = new Row(1);
element.setField(0, "1234");
DaggerSerializationException exception = assertThrows(DaggerSerializationException.class,
() -> kafkaProtoSerializer.serialize(element, null, System.currentTimeMillis() / 1000));
assertEquals("outputTopic is required", exception.getMessage());
}

@Test
public void shouldThrowExceptionWhenOutputTopicIsEmptyForSerializeMethod() {
ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class);
KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, "");
Row element = new Row(1);
element.setField(0, "1234");

DaggerSerializationException exception = assertThrows(DaggerSerializationException.class,
() -> kafkaProtoSerializer.serialize(element, null, System.currentTimeMillis() / 1000));
assertEquals("outputTopic is required", exception.getMessage());
}
}
Loading

0 comments on commit ba209e2

Please sign in to comment.