-
Notifications
You must be signed in to change notification settings - Fork 120
#386: Implemented passing of CryptoKeyReader
.
#387
base: master
Are you sure you want to change the base?
Changes from all commits
42f0d4f
05e3b78
50b9e9b
a0b27ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,19 +19,25 @@ | |
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.pulsar.client.api.CryptoKeyReader; | ||
import org.apache.pulsar.client.api.MessageId; | ||
import org.apache.pulsar.client.api.MessageRouter; | ||
import org.apache.pulsar.client.api.TypedMessageBuilder; | ||
import org.apache.pulsar.client.api.transaction.TxnID; | ||
import org.apache.pulsar.client.impl.conf.ClientConfigurationData; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Properties; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
import static org.apache.flink.util.InstantiationUtil.isSerializable; | ||
import static org.apache.flink.util.Preconditions.checkNotNull; | ||
import static org.apache.flink.util.Preconditions.checkState; | ||
|
||
/** | ||
* Write data to Flink. | ||
|
@@ -41,8 +47,111 @@ | |
@Slf4j | ||
public class FlinkPulsarSink<T> extends FlinkPulsarSinkBase<T> { | ||
|
||
public static class Builder<T> { | ||
private String adminUrl; | ||
private String defaultTopicName; | ||
private ClientConfigurationData clientConf; | ||
private Properties properties; | ||
private PulsarSerializationSchema<T> serializationSchema; | ||
private MessageRouter messageRouter = null; | ||
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE; | ||
private String serviceUrl; | ||
private CryptoKeyReader cryptoKeyReader; | ||
private final Set<String> encryptionKeys = new HashSet<>(); | ||
|
||
public Builder<T> withAdminUrl(final String adminUrl) { | ||
this.adminUrl = adminUrl; | ||
return this; | ||
} | ||
|
||
public Builder<T> withDefaultTopicName(final String defaultTopicName) { | ||
this.defaultTopicName = defaultTopicName; | ||
return this; | ||
} | ||
|
||
public Builder<T> withClientConf(final ClientConfigurationData clientConf) { | ||
this.clientConf = clientConf; | ||
return this; | ||
} | ||
|
||
public Builder<T> withProperties(final Properties properties) { | ||
this.properties = properties; | ||
return this; | ||
} | ||
|
||
public Builder<T> withPulsarSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) { | ||
this.serializationSchema = serializationSchema; | ||
return this; | ||
} | ||
|
||
public Builder<T> withMessageRouter(final MessageRouter messageRouter) { | ||
this.messageRouter = messageRouter; | ||
return this; | ||
} | ||
|
||
public Builder<T> withSemantic(final PulsarSinkSemantic semantic) { | ||
this.semantic = semantic; | ||
return this; | ||
} | ||
|
||
public Builder<T> withServiceUrl(final String serviceUrl) { | ||
this.serviceUrl = serviceUrl; | ||
return this; | ||
} | ||
|
||
public Builder<T> withCryptoKeyReader(CryptoKeyReader cryptoKeyReader) { | ||
this.cryptoKeyReader = cryptoKeyReader; | ||
return this; | ||
} | ||
|
||
public Builder<T> withEncryptionKeys(String... encryptionKeys) { | ||
this.encryptionKeys.addAll(Arrays.asList(encryptionKeys)); | ||
return this; | ||
} | ||
|
||
private Optional<String> getDefaultTopicName() { | ||
return Optional.ofNullable(defaultTopicName); | ||
} | ||
|
||
public FlinkPulsarSink<T> build(){ | ||
if (adminUrl == null) { | ||
throw new IllegalStateException("Admin URL must be set."); | ||
} | ||
if (serializationSchema == null) { | ||
throw new IllegalStateException("Serialization schema must be set."); | ||
} | ||
if (semantic == null) { | ||
throw new IllegalStateException("Semantic must be set."); | ||
} | ||
if (properties == null) { | ||
throw new IllegalStateException("Properties must be set."); | ||
} | ||
if (serviceUrl != null && clientConf != null) { | ||
throw new IllegalStateException("Set either client conf or service URL but not both."); | ||
} | ||
if (serviceUrl != null){ | ||
clientConf = PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties); | ||
} | ||
if (clientConf == null){ | ||
throw new IllegalStateException("Client conf must be set."); | ||
} | ||
if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){ | ||
throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction."); | ||
} | ||
checkState(isSerializable(cryptoKeyReader)); | ||
checkState(isSerializable(encryptionKeys)); | ||
return new FlinkPulsarSink<>(this); | ||
} | ||
|
||
} | ||
|
||
private final PulsarSerializationSchema<T> serializationSchema; | ||
|
||
private FlinkPulsarSink(final Builder<T> builder) { | ||
super(builder.adminUrl, builder.getDefaultTopicName(), builder.clientConf, builder.properties, builder.serializationSchema, builder.messageRouter, builder.semantic, builder.cryptoKeyReader, builder.encryptionKeys); | ||
this.serializationSchema = builder.serializationSchema; | ||
} | ||
|
||
public FlinkPulsarSink( | ||
String adminUrl, | ||
Optional<String> defaultTopicName, | ||
|
@@ -51,9 +160,14 @@ public FlinkPulsarSink( | |
PulsarSerializationSchema serializationSchema, | ||
MessageRouter messageRouter, | ||
PulsarSinkSemantic semantic) { | ||
|
||
super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic); | ||
this.serializationSchema = serializationSchema; | ||
this(new Builder<T>() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactor old ctor doen't look like a good choice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The goal was basically to delegate as much as possible to a single c'tor that contains the interesting logic, instead of repeating variants of the logic inside the c'tor all over the place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Back to the old c'tor. |
||
.withAdminUrl(adminUrl) | ||
.withDefaultTopicName(defaultTopicName.orElse(null)) | ||
.withClientConf(clientConf) | ||
.withProperties(properties) | ||
.withPulsarSerializationSchema(serializationSchema) | ||
.withMessageRouter(messageRouter) | ||
.withSemantic(semantic)); | ||
} | ||
|
||
public FlinkPulsarSink( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,7 @@ | |
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.pulsar.client.api.CryptoKeyReader; | ||
import org.apache.pulsar.client.api.MessageId; | ||
import org.apache.pulsar.client.api.PulsarClientException; | ||
import org.apache.pulsar.client.impl.conf.ClientConfigurationData; | ||
|
@@ -87,8 +88,11 @@ | |
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_FAILED_METRICS_COUNTER; | ||
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.COMMITS_SUCCEEDED_METRICS_COUNTER; | ||
import static org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics.PULSAR_SOURCE_METRICS_GROUP; | ||
import static org.apache.flink.util.InstantiationUtil.isSerializable; | ||
import static org.apache.flink.util.Preconditions.checkArgument; | ||
import static org.apache.flink.util.Preconditions.checkNotNull; | ||
import static org.apache.flink.util.Preconditions.checkState; | ||
|
||
|
||
/** | ||
* Pulsar data source. | ||
|
@@ -102,6 +106,77 @@ public class FlinkPulsarSource<T> | |
CheckpointListener, | ||
CheckpointedFunction { | ||
|
||
public static class Builder<T> { | ||
private String adminUrl; | ||
private Properties properties; | ||
private String serviceUrl; | ||
ClientConfigurationData clientConf; | ||
PulsarDeserializationSchema<T> deserializer; | ||
private CryptoKeyReader cryptoKeyReader; | ||
|
||
public Builder<T> withAdminUrl(final String adminUrl) { | ||
this.adminUrl = adminUrl; | ||
return this; | ||
} | ||
|
||
public Builder<T> withProperties(final Properties properties) { | ||
this.properties = properties; | ||
return this; | ||
} | ||
|
||
public Builder<T> withServiceUrl(final String serviceUrl) { | ||
this.serviceUrl = serviceUrl; | ||
return this; | ||
} | ||
|
||
public Builder<T> withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) { | ||
this.cryptoKeyReader = cryptoKeyReader; | ||
return this; | ||
} | ||
|
||
public Builder<T> withClientConfigurationData(final ClientConfigurationData clientConf){ | ||
this.clientConf = clientConf; | ||
return this; | ||
} | ||
|
||
public Builder<T> withPulsarDeserializionSchema(final PulsarDeserializationSchema<T> deserializer){ | ||
if (this.deserializer != null){ | ||
throw new IllegalStateException("Deserializer was already set."); | ||
} | ||
this.deserializer = deserializer; | ||
return this; | ||
} | ||
|
||
public Builder<T> withDeserializionSchema(final DeserializationSchema<T> deserializer){ | ||
if (this.deserializer != null){ | ||
throw new IllegalStateException("Deserializer was already set."); | ||
} | ||
this.deserializer = PulsarDeserializationSchema.valueOnly(deserializer); | ||
return this; | ||
} | ||
|
||
public FlinkPulsarSource<T> build(){ | ||
if (adminUrl == null){ | ||
throw new IllegalStateException("Admin URL must be set."); | ||
} | ||
if (properties == null){ | ||
throw new IllegalStateException("Properties must be set."); | ||
} | ||
if ((serviceUrl != null && clientConf != null)){ | ||
throw new IllegalStateException("Please specify either service URL plus properties or client conf but not both."); | ||
} | ||
if (serviceUrl != null){ | ||
clientConf = PulsarClientUtils.newClientConf(serviceUrl, properties); | ||
} | ||
if (clientConf == null){ | ||
throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties."); | ||
} | ||
checkState(isSerializable(cryptoKeyReader)); | ||
return new FlinkPulsarSource<>(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still need to check the serialization for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I think it doesn't require the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the source, no encryption keys are required. Added serialization check. |
||
} | ||
|
||
} | ||
|
||
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */ | ||
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; | ||
|
||
|
@@ -229,15 +304,14 @@ public class FlinkPulsarSource<T> | |
|
||
private transient int numParallelTasks; | ||
|
||
public FlinkPulsarSource( | ||
String adminUrl, | ||
ClientConfigurationData clientConf, | ||
PulsarDeserializationSchema<T> deserializer, | ||
Properties properties) { | ||
this.adminUrl = checkNotNull(adminUrl); | ||
this.clientConfigurationData = checkNotNull(clientConf); | ||
this.deserializer = deserializer; | ||
this.properties = properties; | ||
private final CryptoKeyReader cryptoKeyReader; | ||
|
||
private FlinkPulsarSource(final Builder<T> builder) { | ||
this.adminUrl = checkNotNull(builder.adminUrl); | ||
this.clientConfigurationData = checkNotNull(builder.clientConf); | ||
this.deserializer = builder.deserializer; | ||
this.properties = builder.properties; | ||
this.cryptoKeyReader = builder.cryptoKeyReader; | ||
this.caseInsensitiveParams = | ||
SourceSinkUtils.validateStreamSourceOptions(Maps.fromProperties(properties)); | ||
this.readerConf = | ||
|
@@ -259,6 +333,18 @@ public FlinkPulsarSource( | |
this.oldStateVersion = SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion); | ||
} | ||
|
||
public FlinkPulsarSource( | ||
String adminUrl, | ||
ClientConfigurationData clientConf, | ||
PulsarDeserializationSchema<T> deserializer, | ||
Properties properties) { | ||
this(new Builder<T>() | ||
.withAdminUrl(adminUrl) | ||
.withClientConfigurationData(clientConf) | ||
.withPulsarDeserializionSchema(deserializer) | ||
.withProperties(properties)); | ||
} | ||
|
||
public FlinkPulsarSource( | ||
String serviceUrl, | ||
String adminUrl, | ||
|
@@ -614,7 +700,8 @@ protected PulsarFetcher<T> createFetcher( | |
deserializer, | ||
metadataReader, | ||
streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP), | ||
useMetrics | ||
useMetrics, | ||
cryptoKeyReader | ||
); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cryptoKeyReader
requires extra serialization check. UsePreconditions.checkState(InstantiationUtil.isSerializable(cryptoKeyReader))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the check.