Skip to content

Commit 6170ebe

Browse files
authored
Add untilStarted & untilStopped to ReactiveMessagePipeline (#214)
1 parent fb144d9 commit 6170ebe

File tree

9 files changed

+259
-11
lines changed

9 files changed

+259
-11
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ log4j-slf4j2-impl = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", ver
5252
mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
5353
pulsar-client-api = { module = "org.apache.pulsar:pulsar-client-api", version.ref = "pulsar" }
5454
pulsar-client-shaded = { module = "org.apache.pulsar:pulsar-client", version.ref = "pulsar" }
55+
pulsar-client-all = { module = "org.apache.pulsar:pulsar-client-all", version.ref = "pulsar" }
5556
rat-gradle = { module = "org.nosphere.apache:creadur-rat-gradle", version.ref = "rat-gradle" }
5657
reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor" }
5758
reactor-test = { module = "io.projectreactor:reactor-test", version.ref = "reactor" }

pulsar-client-reactive-adapter/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ dependencies {
3535
testImplementation libs.bundles.log4j
3636
testImplementation libs.mockito.core
3737

38+
intTestImplementation libs.pulsar.client.all
3839
intTestImplementation project(':pulsar-client-reactive-producer-cache-caffeine')
3940
intTestImplementation project(path: ':pulsar-client-reactive-producer-cache-caffeine-shaded', configuration: 'shadow')
4041
intTestImplementation libs.junit.jupiter

pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@
3636
import java.util.stream.Collectors;
3737
import java.util.stream.IntStream;
3838

39+
import org.apache.pulsar.client.admin.PulsarAdmin;
3940
import org.apache.pulsar.client.api.PulsarClient;
4041
import org.apache.pulsar.client.api.Schema;
42+
import org.apache.pulsar.common.policies.data.SubscriptionStats;
43+
import org.apache.pulsar.common.policies.data.TopicStats;
4144
import org.apache.pulsar.reactive.client.api.MessageSpec;
4245
import org.apache.pulsar.reactive.client.api.MessageSpecBuilder;
4346
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
@@ -94,6 +97,38 @@ void shouldConsumeMessages() throws Exception {
9497
}
9598
}
9699

100+
@Test
101+
void shouldSupportWaitingForConsumingToStartAndStop() throws Exception {
102+
try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient();
103+
PulsarAdmin pulsarAdmin = SingletonPulsarContainer.createPulsarAdmin()) {
104+
String topicName = "test" + UUID.randomUUID();
105+
ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
106+
ReactiveMessagePipeline pipeline = reactivePulsarClient.messageConsumer(Schema.STRING)
107+
.subscriptionName("sub")
108+
.topic(topicName)
109+
.build()
110+
.messagePipeline()
111+
.messageHandler((message) -> Mono.empty())
112+
.build()
113+
.start();
114+
115+
// wait for consuming to start
116+
pipeline.untilStarted().block(Duration.ofSeconds(5));
117+
// there should be an existing subscription
118+
List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
119+
assertThat(subscriptions).as("subscription should be created").contains("sub");
120+
121+
// stop the pipeline
122+
pipeline.stop();
123+
// and wait for it to stop
124+
pipeline.untilStopped().block(Duration.ofSeconds(5));
125+
// there should be no consumers
126+
TopicStats topicStats = pulsarAdmin.topics().getStats(topicName);
127+
SubscriptionStats subStats = topicStats.getSubscriptions().get("sub");
128+
assertThat(subStats.getConsumers()).isEmpty();
129+
}
130+
}
131+
97132
@ParameterizedTest
98133
@EnumSource(MessageOrderScenario.class)
99134
void shouldRetainMessageOrder(MessageOrderScenario messageOrderScenario) throws Exception {

pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.pulsar.reactive.client.adapter;
2121

22+
import org.apache.pulsar.client.admin.PulsarAdmin;
2223
import org.apache.pulsar.client.api.PulsarClient;
2324
import org.apache.pulsar.client.api.PulsarClientException;
2425
import org.testcontainers.containers.PulsarContainer;
@@ -44,6 +45,12 @@ static PulsarClient createPulsarClient() throws PulsarClientException {
4445
.build();
4546
}
4647

48+
static PulsarAdmin createPulsarAdmin() throws PulsarClientException {
49+
return PulsarAdmin.builder()
50+
.serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl())
51+
.build();
52+
}
53+
4754
static DockerImageName getPulsarImage() {
4855
return DockerImageName.parse("apachepulsar/pulsar:4.0.4");
4956
}

pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.pulsar.client.api.Consumer;
2626
import org.apache.pulsar.client.api.ConsumerBuilder;
2727
import org.apache.pulsar.client.api.PulsarClient;
28+
import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031
import reactor.core.publisher.Flux;
@@ -45,12 +46,20 @@ class ReactiveConsumerAdapter<T> {
4546
}
4647

4748
private Mono<Consumer<T>> createConsumerMono() {
48-
return AdapterImplementationFactory.adaptPulsarFuture(
49-
() -> this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync());
49+
return Mono.deferContextual((contextView) -> AdapterImplementationFactory
50+
.adaptPulsarFuture(
51+
() -> this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync())
52+
.doOnSuccess((consumer) -> contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
53+
.ifPresent((listener) -> listener.onConsumerCreated(consumer))));
5054
}
5155

5256
private Mono<Void> closeConsumer(Consumer<?> consumer) {
53-
return Mono.fromFuture(consumer::closeAsync).doOnSuccess((__) -> this.LOG.info("Consumer closed {}", consumer));
57+
return Mono.deferContextual((contextView) -> Mono.fromFuture(consumer::closeAsync).doFinally((signalType) -> {
58+
this.LOG.info("Consumer closed {}", consumer);
59+
contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
60+
.ifPresent((listener) -> listener.onConsumerClosed(consumer));
61+
}));
62+
5463
}
5564

5665
<R> Mono<R> usingConsumer(Function<Consumer<T>, Mono<R>> usingConsumerAction) {

pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,26 @@
1919

2020
package org.apache.pulsar.reactive.client.api;
2121

22+
import reactor.core.publisher.Mono;
23+
2224
/**
2325
* Reactive message pipeline interface.
2426
*/
2527
public interface ReactiveMessagePipeline extends AutoCloseable {
2628

2729
/**
28-
* Starts the reactive pipeline.
29-
* @return the pipeline
30+
* Starts the reactive pipeline asynchronously.
31+
* @return the pipeline instance
32+
* @see #untilStarted() For returning a reactive publisher (Mono) that completes after
33+
* the pipeline has actually started.
3034
*/
3135
ReactiveMessagePipeline start();
3236

3337
/**
34-
* Stops the reactive pipeline.
35-
* @return the reactive pipeline
38+
* Stops the reactive pipeline asynchronously.
39+
* @return the pipeline instance
40+
* @see #untilStopped() For returning a reactive publisher (Mono) that completes after
41+
* the pipeline has actually stopped.
3642
*/
3743
ReactiveMessagePipeline stop();
3844

@@ -43,11 +49,54 @@ public interface ReactiveMessagePipeline extends AutoCloseable {
4349
boolean isRunning();
4450

4551
/**
46-
* Closes the reactive pipeline.
52+
* Closes the reactive pipeline asynchronously without waiting for shutdown
53+
* completion.
4754
* @throws Exception if an error occurs
4855
*/
4956
default void close() throws Exception {
5057
stop();
5158
}
5259

60+
/**
61+
* <p>
62+
* Returns a reactive publisher (Mono) that completes after the pipeline has
63+
* successfully subscribed to the input topic(s) and started consuming messages for
64+
* the first time after pipeline creation. This method is not intended to be used
65+
* after a pipeline restarts following failure. Use this method to wait for consumer
66+
* and Pulsar subscription creation. This helps avoid race conditions when sending
67+
* messages immediately after the pipeline starts.
68+
* </p>
69+
* <p>
70+
* The {@link #start()} method must be called before invoking this method.
71+
* </p>
72+
* <p>
73+
* To wait for the operation to complete synchronously, it is necessary to call
74+
* {@link Mono#block()} on the returned Mono.
75+
* </p>
76+
* @return a Mono that completes after the pipeline has created its underlying Pulsar
77+
* consumer
78+
*/
79+
default Mono<Void> untilStarted() {
80+
return Mono.empty();
81+
}
82+
83+
/**
84+
* <p>
85+
* Returns a reactive publisher (Mono) that completes after the pipeline has closed
86+
* the underlying Pulsar consumer and stopped consuming new messages.
87+
* </p>
88+
* <p>
89+
* The {@link #stop()} method must be called before invoking this method.
90+
* </p>
91+
* <p>
92+
* To wait for the operation to complete synchronously, it is necessary to call
93+
* {@link Mono#block()} on the returned Mono.
94+
* </p>
95+
* @return a Mono that completes when the pipeline has closed the underlying Pulsar
96+
* consumer
97+
*/
98+
default Mono<Void> untilStopped() {
99+
return Mono.empty();
100+
}
101+
53102
}

pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.time.Duration;
2323
import java.util.Objects;
24+
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.atomic.AtomicReference;
2526
import java.util.function.BiConsumer;
2627
import java.util.function.Function;
@@ -67,6 +68,10 @@ class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
6768

6869
private final MessageGroupingFunction groupingFunction;
6970

71+
private final AtomicReference<InternalConsumerListenerImpl> consumerListener = new AtomicReference<>();
72+
73+
private final AtomicReference<CompletableFuture<Void>> pipelineStoppedFuture = new AtomicReference<>();
74+
7075
DefaultReactiveMessagePipeline(ReactiveMessageConsumer<T> messageConsumer,
7176
Function<Message<T>, Publisher<Void>> messageHandler, BiConsumer<Message<T>, Throwable> errorLogger,
7277
Retry pipelineRetrySpec, Duration handlingTimeout, Function<Mono<Void>, Publisher<Void>> transformer,
@@ -83,7 +88,14 @@ class DefaultReactiveMessagePipeline<T> implements ReactiveMessagePipeline {
8388
this.pipeline = messageConsumer.consumeMany(this::createMessageConsumer)
8489
.then()
8590
.transform(transformer)
86-
.transform(this::decoratePipeline);
91+
.transform(this::decoratePipeline)
92+
.doFinally((signalType) -> {
93+
CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
94+
if (f != null) {
95+
f.complete(null);
96+
}
97+
})
98+
.doFirst(() -> this.pipelineStoppedFuture.set(new CompletableFuture<>()));
8799
}
88100

89101
private Mono<Void> decorateMessageHandler(Mono<Void> messageHandler) {
@@ -168,14 +180,26 @@ public ReactiveMessagePipeline start() {
168180
if (this.killSwitch.get() != null) {
169181
throw new IllegalStateException("Message handler is already running.");
170182
}
171-
Disposable disposable = this.pipeline.subscribe(null, this::logError, this::logUnexpectedCompletion);
183+
InternalConsumerListenerImpl consumerListener = new InternalConsumerListenerImpl();
184+
Disposable disposable = this.pipeline.contextWrite(Context.of(InternalConsumerListener.class, consumerListener))
185+
.subscribe(null, this::logError, this::logUnexpectedCompletion);
172186
if (!this.killSwitch.compareAndSet(null, disposable)) {
173187
disposable.dispose();
174188
throw new IllegalStateException("Message handler was already running.");
175189
}
190+
this.consumerListener.set(consumerListener);
176191
return this;
177192
}
178193

194+
@Override
195+
public Mono<Void> untilStarted() {
196+
if (!isRunning()) {
197+
throw new IllegalStateException("Pipeline isn't running. Call start first.");
198+
}
199+
InternalConsumerListenerImpl internalConsumerListener = this.consumerListener.get();
200+
return internalConsumerListener.waitForConsumerCreated();
201+
}
202+
179203
private void logError(Throwable throwable) {
180204
LOG.error("ReactiveMessageHandler was unexpectedly terminated.", throwable);
181205
}
@@ -195,9 +219,44 @@ public ReactiveMessagePipeline stop() {
195219
return this;
196220
}
197221

222+
@Override
223+
public Mono<Void> untilStopped() {
224+
if (isRunning()) {
225+
throw new IllegalStateException("Pipeline is running. Call stop first.");
226+
}
227+
CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
228+
if (f != null) {
229+
return Mono.fromFuture(f, true);
230+
}
231+
else {
232+
return Mono.empty();
233+
}
234+
}
235+
198236
@Override
199237
public boolean isRunning() {
200238
return this.killSwitch.get() != null;
201239
}
202240

241+
private static final class InternalConsumerListenerImpl implements InternalConsumerListener {
242+
243+
private final CompletableFuture<Void> createdFuture;
244+
245+
private InternalConsumerListenerImpl() {
246+
this.createdFuture = new CompletableFuture<>();
247+
}
248+
249+
@Override
250+
public void onConsumerCreated(Object nativeConsumer) {
251+
if (!this.createdFuture.isDone()) {
252+
this.createdFuture.complete(null);
253+
}
254+
}
255+
256+
Mono<Void> waitForConsumerCreated() {
257+
return Mono.fromFuture(this.createdFuture, true);
258+
}
259+
260+
}
261+
203262
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.pulsar.reactive.client.internal.api;
21+
22+
/**
23+
* Internal interface to signal the creation and closing of a native consumer. This is not
24+
* to be intended to be used by applications.
25+
*/
26+
public interface InternalConsumerListener {
27+
28+
/**
29+
* Called when a new native consumer is created. This is called each time a new
30+
* consumer is created initially or as a result of a reactive pipeline retry.
31+
* @param nativeConsumer the native consumer instance
32+
*/
33+
default void onConsumerCreated(Object nativeConsumer) {
34+
// no-op
35+
}
36+
37+
/**
38+
* Called when a native consumer is closed. This is called each time a consumer is
39+
* closed as a result of a reactive pipeline retry or when the pipeline is closed.
40+
* @param nativeConsumer the native consumer instance
41+
*/
42+
default void onConsumerClosed(Object nativeConsumer) {
43+
// no-op
44+
}
45+
46+
}

0 commit comments

Comments
 (0)