Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

#386: Implemented passing of CryptoKeyReader. #387

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.InstantiationUtil.isSerializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Write data to Flink.
Expand All @@ -41,8 +47,111 @@
@Slf4j
public class FlinkPulsarSink<T> extends FlinkPulsarSinkBase<T> {

public static class Builder<T> {
private String adminUrl;
private String defaultTopicName;
private ClientConfigurationData clientConf;
private Properties properties;
private PulsarSerializationSchema<T> serializationSchema;
private MessageRouter messageRouter = null;
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE;
private String serviceUrl;
private CryptoKeyReader cryptoKeyReader;
private final Set<String> encryptionKeys = new HashSet<>();

public Builder<T> withAdminUrl(final String adminUrl) {
this.adminUrl = adminUrl;
return this;
}

public Builder<T> withDefaultTopicName(final String defaultTopicName) {
this.defaultTopicName = defaultTopicName;
return this;
}

public Builder<T> withClientConf(final ClientConfigurationData clientConf) {
this.clientConf = clientConf;
return this;
}

public Builder<T> withProperties(final Properties properties) {
this.properties = properties;
return this;
}

public Builder<T> withPulsarSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
return this;
}

public Builder<T> withMessageRouter(final MessageRouter messageRouter) {
this.messageRouter = messageRouter;
return this;
}

public Builder<T> withSemantic(final PulsarSinkSemantic semantic) {
this.semantic = semantic;
return this;
}

public Builder<T> withServiceUrl(final String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}

public Builder<T> withCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
return this;
}

public Builder<T> withEncryptionKeys(String... encryptionKeys) {
this.encryptionKeys.addAll(Arrays.asList(encryptionKeys));
return this;
}

private Optional<String> getDefaultTopicName() {
return Optional.ofNullable(defaultTopicName);
}

public FlinkPulsarSink<T> build(){
if (adminUrl == null) {
throw new IllegalStateException("Admin URL must be set.");
}
if (serializationSchema == null) {
throw new IllegalStateException("Serialization schema must be set.");
}
if (semantic == null) {
throw new IllegalStateException("Semantic must be set.");
}
if (properties == null) {
throw new IllegalStateException("Properties must be set.");
}
if (serviceUrl != null && clientConf != null) {
throw new IllegalStateException("Set either client conf or service URL but not both.");
}
if (serviceUrl != null){
clientConf = PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties);
}
if (clientConf == null){
throw new IllegalStateException("Client conf must be set.");
}
if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cryptoKeyReader requires extra serialization check. Use Preconditions.checkState(InstantiationUtil.isSerializable(cryptoKeyReader))

Copy link
Author

@objecttrouve objecttrouve Aug 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the check.

throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction.");
}
checkState(isSerializable(cryptoKeyReader));
checkState(isSerializable(encryptionKeys));
return new FlinkPulsarSink<>(this);
}

}

private final PulsarSerializationSchema<T> serializationSchema;

private FlinkPulsarSink(final Builder<T> builder) {
super(builder.adminUrl, builder.getDefaultTopicName(), builder.clientConf, builder.properties, builder.serializationSchema, builder.messageRouter, builder.semantic, builder.cryptoKeyReader, builder.encryptionKeys);
this.serializationSchema = builder.serializationSchema;
}

public FlinkPulsarSink(
String adminUrl,
Optional<String> defaultTopicName,
Expand All @@ -51,9 +160,14 @@ public FlinkPulsarSink(
PulsarSerializationSchema serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {

super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic);
this.serializationSchema = serializationSchema;
this(new Builder<T>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor old ctor doen't look like a good choice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal was basically to delegate as much as possible to a single c'tor that contains the interesting logic, instead of repeating variants of the logic inside the c'tor all over the place.
But to be honest, I don't mind very much. So I'll just revert to the old c'tor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the old c'tor.

.withAdminUrl(adminUrl)
.withDefaultTopicName(defaultTopicName.orElse(null))
.withClientConf(clientConf)
.withProperties(properties)
.withPulsarSerializationSchema(serializationSchema)
.withMessageRouter(messageRouter)
.withSemantic(semantic));
}

public FlinkPulsarSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
Expand All @@ -62,10 +63,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -143,14 +146,18 @@ abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, Flin

protected transient Map<String, Producer<T>> topic2Producer;

private final CryptoKeyReader cryptoKeyReader;

private final Set<String> encryptionKeys;

public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
ClientConfigurationData clientConf,
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter) {
this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE);
this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE, null, new HashSet<>());
}

public FlinkPulsarSinkBase(
Expand All @@ -160,7 +167,9 @@ public FlinkPulsarSinkBase(
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {
PulsarSinkSemantic semantic,
final CryptoKeyReader cryptoKeyReader,
final Set<String> encryptionKeys) {
super(new TransactionStateSerializer(), VoidSerializer.INSTANCE);

this.adminUrl = checkNotNull(adminUrl);
Expand Down Expand Up @@ -214,21 +223,9 @@ public FlinkPulsarSinkBase(
if (this.clientConfigurationData.getServiceUrl() == null) {
throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
}
}

public FlinkPulsarSinkBase(
String serviceUrl,
String adminUrl,
Optional<String> defaultTopicName,
Properties properties,
PulsarSerializationSchema serializationSchema,
MessageRouter messageRouter) {
this(adminUrl,
defaultTopicName,
PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties),
properties,
serializationSchema,
messageRouter);
this.cryptoKeyReader = cryptoKeyReader;
this.encryptionKeys = encryptionKeys;
}

@Override
Expand Down Expand Up @@ -340,6 +337,12 @@ protected Producer<T> createProducer(
// maximizing the throughput
.batchingMaxBytes(5 * 1024 * 1024)
.loadConf(producerConf);
if (cryptoKeyReader != null){
builder.cryptoKeyReader(cryptoKeyReader);
for (final String encryptionKey : this.encryptionKeys) {
builder.addEncryptionKey(encryptionKey);
}
}
if (messageRouter == null) {
return builder.create();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
Expand All @@ -87,8 +88,11 @@
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_FAILED_METRICS_COUNTER;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER;
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP;
import static org.apache.flink.util.InstantiationUtil.isSerializable;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;


/**
* Pulsar data source.
Expand All @@ -102,6 +106,77 @@ public class FlinkPulsarSource<T>
CheckpointListener,
CheckpointedFunction {

public static class Builder<T> {
private String adminUrl;
private Properties properties;
private String serviceUrl;
ClientConfigurationData clientConf;
PulsarDeserializationSchema<T> deserializer;
private CryptoKeyReader cryptoKeyReader;

public Builder<T> withAdminUrl(final String adminUrl) {
this.adminUrl = adminUrl;
return this;
}

public Builder<T> withProperties(final Properties properties) {
this.properties = properties;
return this;
}

public Builder<T> withServiceUrl(final String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}

public Builder<T> withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
return this;
}

public Builder<T> withClientConfigurationData(final ClientConfigurationData clientConf){
this.clientConf = clientConf;
return this;
}

public Builder<T> withPulsarDeserializionSchema(final PulsarDeserializationSchema<T> deserializer){
if (this.deserializer != null){
throw new IllegalStateException("Deserializer was already set.");
}
this.deserializer = deserializer;
return this;
}

public Builder<T> withDeserializionSchema(final DeserializationSchema<T> deserializer){
if (this.deserializer != null){
throw new IllegalStateException("Deserializer was already set.");
}
this.deserializer = PulsarDeserializationSchema.valueOnly(deserializer);
return this;
}

public FlinkPulsarSource<T> build(){
if (adminUrl == null){
throw new IllegalStateException("Admin URL must be set.");
}
if (properties == null){
throw new IllegalStateException("Properties must be set.");
}
if ((serviceUrl != null && clientConf != null)){
throw new IllegalStateException("Please specify either service URL plus properties or client conf but not both.");
}
if (serviceUrl != null){
clientConf = PulsarClientUtils.newClientConf(serviceUrl, properties);
}
if (clientConf == null){
throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties.");
}
checkState(isSerializable(cryptoKeyReader));
return new FlinkPulsarSource<>(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cryptoKeyReader doesn't require encryptionKeys here. Right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to check the serialization for encryptionKeys here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think it doesn't require the encryptionKeys. But I'll double check.
OK, will check serialization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the source, no encryption keys are required. Added serialization check.

}

}

/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;

Expand Down Expand Up @@ -229,15 +304,14 @@ public class FlinkPulsarSource<T>

private transient int numParallelTasks;

public FlinkPulsarSource(
String adminUrl,
ClientConfigurationData clientConf,
PulsarDeserializationSchema<T> deserializer,
Properties properties) {
this.adminUrl = checkNotNull(adminUrl);
this.clientConfigurationData = checkNotNull(clientConf);
this.deserializer = deserializer;
this.properties = properties;
private final CryptoKeyReader cryptoKeyReader;

private FlinkPulsarSource(final Builder<T> builder) {
this.adminUrl = checkNotNull(builder.adminUrl);
this.clientConfigurationData = checkNotNull(builder.clientConf);
this.deserializer = builder.deserializer;
this.properties = builder.properties;
this.cryptoKeyReader = builder.cryptoKeyReader;
this.caseInsensitiveParams =
SourceSinkUtils.validateStreamSourceOptions(Maps.fromProperties(properties));
this.readerConf =
Expand All @@ -259,6 +333,18 @@ public FlinkPulsarSource(
this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion);
}

public FlinkPulsarSource(
String adminUrl,
ClientConfigurationData clientConf,
PulsarDeserializationSchema<T> deserializer,
Properties properties) {
this(new Builder<T>()
.withAdminUrl(adminUrl)
.withClientConfigurationData(clientConf)
.withPulsarDeserializionSchema(deserializer)
.withProperties(properties));
}

public FlinkPulsarSource(
String serviceUrl,
String adminUrl,
Expand Down Expand Up @@ -614,7 +700,8 @@ protected PulsarFetcher<T> createFetcher(
deserializer,
metadataReader,
streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP),
useMetrics
useMetrics,
cryptoKeyReader
);
}

Expand Down
Loading