Skip to content

Commit 42f0d4f

Browse files
committed
streamnative#386: Implemented passing of CryptoKeyReader.
As requested [here](streamnative#386), it's now possible to pass a `CryptoKeyReader` (and encryption keys) to `FlinkPulsarSource` and `FlinkPulsarSink`. Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors. Added integration test. (Maybe it can be moved to one of the other tests to avoid overhead.)
1 parent 0b70d91 commit 42f0d4f

File tree

6 files changed

+816
-77
lines changed

6 files changed

+816
-77
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.java

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
2020

2121
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.pulsar.client.api.CryptoKeyReader;
2223
import org.apache.pulsar.client.api.MessageId;
2324
import org.apache.pulsar.client.api.MessageRouter;
2425
import org.apache.pulsar.client.api.TypedMessageBuilder;
2526
import org.apache.pulsar.client.api.transaction.TxnID;
2627
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
2728

2829
import java.util.ArrayList;
30+
import java.util.Arrays;
31+
import java.util.HashSet;
2932
import java.util.List;
3033
import java.util.Optional;
3134
import java.util.Properties;
35+
import java.util.Set;
3236
import java.util.concurrent.CompletableFuture;
3337

3438
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,8 +45,115 @@
4145
@Slf4j
4246
public class FlinkPulsarSink<T> extends FlinkPulsarSinkBase<T> {
4347

48+
public static class Builder<T> {
49+
private String adminUrl;
50+
private String defaultTopicName;
51+
private ClientConfigurationData clientConf;
52+
private Properties properties;
53+
private PulsarSerializationSchema<T> serializationSchema;
54+
private MessageRouter messageRouter = null;
55+
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE;
56+
private String serviceUrl;
57+
private CryptoKeyReader cryptoKeyReader;
58+
private final Set<String> encryptionKeys = new HashSet<>();
59+
60+
public Builder<T> withAdminUrl(final String adminUrl) {
61+
this.adminUrl = adminUrl;
62+
return this;
63+
}
64+
65+
public Builder<T> withDefaultTopicName(final String defaultTopicName) {
66+
this.defaultTopicName = defaultTopicName;
67+
return this;
68+
}
69+
70+
public Builder<T> withClientConf(final ClientConfigurationData clientConf) {
71+
this.clientConf = clientConf;
72+
return this;
73+
}
74+
75+
public Builder<T> withProperties(final Properties properties) {
76+
this.properties = properties;
77+
return this;
78+
}
79+
80+
public Builder<T> withPulsarSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) {
81+
this.serializationSchema = serializationSchema;
82+
return this;
83+
}
84+
85+
public Builder<T> withMessageRouter(final MessageRouter messageRouter) {
86+
this.messageRouter = messageRouter;
87+
return this;
88+
}
89+
90+
public Builder<T> withSemantic(final PulsarSinkSemantic semantic) {
91+
this.semantic = semantic;
92+
return this;
93+
}
94+
95+
public Builder<T> withServiceUrl(final String serviceUrl) {
96+
this.serviceUrl = serviceUrl;
97+
return this;
98+
}
99+
100+
public Builder<T> withCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
101+
this.cryptoKeyReader = cryptoKeyReader;
102+
return this;
103+
}
104+
105+
public Builder<T> withEncryptionKeys(String... encryptionKeys) {
106+
this.encryptionKeys.addAll(Arrays.asList(encryptionKeys));
107+
return this;
108+
}
109+
110+
public FlinkPulsarSink<T> build(){
111+
if (adminUrl == null) {
112+
throw new IllegalStateException("Admin URL must be set.");
113+
}
114+
if (serializationSchema == null) {
115+
throw new IllegalStateException("Serialization schema must be set.");
116+
}
117+
if (semantic == null) {
118+
throw new IllegalStateException("Semantic must be set.");
119+
}
120+
if (properties == null) {
121+
throw new IllegalStateException("Properties must be set.");
122+
}
123+
if (serviceUrl != null && clientConf != null) {
124+
throw new IllegalStateException("Set either client conf or service URL but not both.");
125+
}
126+
if (serviceUrl != null){
127+
clientConf = PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties);
128+
}
129+
if (clientConf == null){
130+
throw new IllegalStateException("Client conf must be set.");
131+
}
132+
if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){
133+
throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction.");
134+
}
135+
return new FlinkPulsarSink<>(this);
136+
}
137+
138+
}
139+
44140
private final PulsarSerializationSchema<T> serializationSchema;
45141

142+
public FlinkPulsarSink(final Builder<T> builder) {
143+
super(
144+
new FlinkPulsarSinkBase.Config<T>()
145+
.withAdminUrl(builder.adminUrl)
146+
.withDefaultTopicName(builder.defaultTopicName)
147+
.withClientConf(builder.clientConf)
148+
.withProperties(builder.properties)
149+
.withSerializationSchema(builder.serializationSchema)
150+
.withMessageRouter(builder.messageRouter)
151+
.withSemantic(builder.semantic)
152+
.withCryptoKeyReader(builder.cryptoKeyReader)
153+
.withEncryptionKeys(builder.encryptionKeys));
154+
this.serializationSchema = builder.serializationSchema;
155+
}
156+
46157
public FlinkPulsarSink(
47158
String adminUrl,
48159
Optional<String> defaultTopicName,
@@ -51,9 +162,14 @@ public FlinkPulsarSink(
51162
PulsarSerializationSchema serializationSchema,
52163
MessageRouter messageRouter,
53164
PulsarSinkSemantic semantic) {
54-
55-
super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic);
56-
this.serializationSchema = serializationSchema;
165+
this(new Builder<T>()
166+
.withAdminUrl(adminUrl)
167+
.withDefaultTopicName(defaultTopicName.orElse(null))
168+
.withClientConf(clientConf)
169+
.withProperties(properties)
170+
.withPulsarSerializationSchema(serializationSchema)
171+
.withMessageRouter(messageRouter)
172+
.withSemantic(semantic));
57173
}
58174

59175
public FlinkPulsarSink(

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import lombok.extern.slf4j.Slf4j;
4444
import org.apache.pulsar.client.admin.PulsarAdmin;
45+
import org.apache.pulsar.client.api.CryptoKeyReader;
4546
import org.apache.pulsar.client.api.MessageId;
4647
import org.apache.pulsar.client.api.MessageRouter;
4748
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -62,10 +63,12 @@
6263
import java.io.IOException;
6364
import java.util.ArrayList;
6465
import java.util.HashMap;
66+
import java.util.HashSet;
6567
import java.util.List;
6668
import java.util.Map;
6769
import java.util.Optional;
6870
import java.util.Properties;
71+
import java.util.Set;
6972
import java.util.concurrent.CompletableFuture;
7073
import java.util.concurrent.ConcurrentHashMap;
7174
import java.util.concurrent.ExecutionException;
@@ -86,6 +89,64 @@
8689
@Slf4j
8790
abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, FlinkPulsarSinkBase.PulsarTransactionState<T>, Void> implements CheckpointedFunction {
8891

92+
public static class Config<T> {
93+
private String adminUrl;
94+
private Optional<String> defaultTopicName;
95+
private ClientConfigurationData clientConf;
96+
private Properties properties;
97+
private PulsarSerializationSchema<T> serializationSchema;
98+
private MessageRouter messageRouter;
99+
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE;
100+
private CryptoKeyReader cryptoKeyReader;
101+
private Set<String> encryptionKeys = new HashSet<>();
102+
103+
public Config<T> withAdminUrl(final String adminUrl) {
104+
this.adminUrl = adminUrl;
105+
return this;
106+
}
107+
108+
public Config<T> withDefaultTopicName(final String defaultTopicName) {
109+
this.defaultTopicName = Optional.ofNullable(defaultTopicName);
110+
return this;
111+
}
112+
113+
public Config<T> withClientConf(ClientConfigurationData clientConf) {
114+
this.clientConf = clientConf;
115+
return this;
116+
}
117+
118+
public Config<T> withProperties(final Properties properties) {
119+
this.properties = properties;
120+
return this;
121+
}
122+
123+
public Config<T> withSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) {
124+
this.serializationSchema = serializationSchema;
125+
return this;
126+
}
127+
128+
public Config<T> withMessageRouter(final MessageRouter messageRouter) {
129+
this.messageRouter = messageRouter;
130+
return this;
131+
}
132+
133+
public Config<T> withSemantic(final PulsarSinkSemantic semantic) {
134+
this.semantic = semantic;
135+
return this;
136+
}
137+
138+
public Config<T> withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) {
139+
this.cryptoKeyReader = cryptoKeyReader;
140+
return this;
141+
}
142+
143+
public Config<T> withEncryptionKeys(final Set<String> encryptionKeys) {
144+
this.encryptionKeys = encryptionKeys;
145+
return this;
146+
}
147+
148+
}
149+
89150
protected String adminUrl;
90151

91152
protected ClientConfigurationData clientConfigurationData;
@@ -143,6 +204,10 @@ abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, Flin
143204

144205
protected transient Map<String, Producer<T>> topic2Producer;
145206

207+
private final CryptoKeyReader cryptoKeyReader;
208+
209+
private final Set<String> encryptionKeys;
210+
146211
public FlinkPulsarSinkBase(
147212
String adminUrl,
148213
Optional<String> defaultTopicName,
@@ -153,34 +218,29 @@ public FlinkPulsarSinkBase(
153218
this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE);
154219
}
155220

156-
public FlinkPulsarSinkBase(
157-
String adminUrl,
158-
Optional<String> defaultTopicName,
159-
ClientConfigurationData clientConf,
160-
Properties properties,
161-
PulsarSerializationSchema<T> serializationSchema,
162-
MessageRouter messageRouter,
163-
PulsarSinkSemantic semantic) {
221+
public FlinkPulsarSinkBase(final Config<T> config) {
164222
super(new TransactionStateSerializer(), VoidSerializer.INSTANCE);
165223

166-
this.adminUrl = checkNotNull(adminUrl);
167-
this.semantic = semantic;
224+
this.adminUrl = checkNotNull(config.adminUrl);
225+
this.semantic = config.semantic;
226+
this.cryptoKeyReader = config.cryptoKeyReader;
227+
this.encryptionKeys = config.encryptionKeys;
168228

169-
if (defaultTopicName.isPresent()) {
229+
if (config.defaultTopicName.isPresent()) {
170230
this.forcedTopic = true;
171-
this.defaultTopic = defaultTopicName.get();
231+
this.defaultTopic = config.defaultTopicName.get();
172232
} else {
173233
this.forcedTopic = false;
174234
this.defaultTopic = null;
175235
}
176236

177-
this.serializationSchema = serializationSchema;
237+
this.serializationSchema = config.serializationSchema;
178238

179-
this.messageRouter = messageRouter;
239+
this.messageRouter = config.messageRouter;
180240

181-
this.clientConfigurationData = clientConf;
241+
this.clientConfigurationData = config.clientConf;
182242

183-
this.properties = checkNotNull(properties);
243+
this.properties = checkNotNull(config.properties);
184244

185245
this.caseInsensitiveParams =
186246
SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties));
@@ -216,6 +276,25 @@ public FlinkPulsarSinkBase(
216276
}
217277
}
218278

279+
public FlinkPulsarSinkBase(
280+
String adminUrl,
281+
Optional<String> defaultTopicName,
282+
ClientConfigurationData clientConf,
283+
Properties properties,
284+
PulsarSerializationSchema<T> serializationSchema,
285+
MessageRouter messageRouter,
286+
PulsarSinkSemantic semantic) {
287+
this(new Config<T>()
288+
.withAdminUrl(adminUrl)
289+
.withDefaultTopicName(defaultTopicName.orElse(null))
290+
.withClientConf(clientConf)
291+
.withProperties(properties)
292+
.withSerializationSchema(serializationSchema)
293+
.withMessageRouter(messageRouter)
294+
.withSemantic(semantic)
295+
);
296+
}
297+
219298
public FlinkPulsarSinkBase(
220299
String serviceUrl,
221300
String adminUrl,
@@ -340,6 +419,12 @@ protected Producer<T> createProducer(
340419
// maximizing the throughput
341420
.batchingMaxBytes(5 * 1024 * 1024)
342421
.loadConf(producerConf);
422+
if (cryptoKeyReader != null){
423+
builder.cryptoKeyReader(cryptoKeyReader);
424+
for (final String encryptionKey : this.encryptionKeys) {
425+
builder.addEncryptionKey(encryptionKey);
426+
}
427+
}
343428
if (messageRouter == null) {
344429
return builder.create();
345430
} else {

0 commit comments

Comments
 (0)