This project provides a basic Spring friendly API for developing Apache Pulsar applications.
Most of the ideas in this project are borrowed from the Spring for Apache Kafka project, thus a familiarity with the Spring support available in Spring for Apache Kafka would help a lot.
-
IMPORTANT: This is a WIP project. Many of the support available at the moment are prototypes and experimental.
We recommend using the library spring-pulsar
in association with Spring Boot and therefore should also use spring-pulsar-boot-autoconfigure
.
If you simply use the module spring-pulsar-boot-autoconfigure
, then that will transitively include spring-pulsar
.
When using spring-pulsar-boot-autoconfigure
, you get the PulsarClient
auto-configured.
This is done through a factory bean called PulsarClientFactoryBean
, which takes a configuration object PulsarClientConfiguration
.
By default, the application tries to connect a local Pulsar instance available at pulsar://localhost:6650
.
The properties listed below are available to be set on the Pulsar client.
The same defaults expected by the Pulsar client are honored when no configuration provided for these properties.
spring.pulsar.client.serviceUrl
spring.pulsar.client.authPluginClassName
spring.pulsar.client.authParams
spring.pulsar.client.operationTimeoutMs
spring.pulsar.client.statsIntervalSeconds
spring.pulsar.client.numIoThreads
spring.pulsar.client.useTcpNoDelayuseTls
spring.pulsar.client.tlsAllowInsecureConnection
spring.pulsar.client.tlsHostnameVerificationEnable
spring.pulsar.client.concurrentLookupRequest
spring.pulsar.client.maxLookupRequest
spring.pulsar.client.maxNumberOfRejectedRequestPerConnection
spring.pulsar.client.keepAliveIntervalSeconds
spring.pulsar.client.connectionTimeoutMs
spring.pulsar.client.requestTimeoutMs
spring.pulsar.client.initialBackoffIntervalNanos
spring.pulsar.client.maxBackoffIntervalNanos;
On the Pulsar producer side, Sprig Boot auto-configuration will provide a PulsarTemplate
which is backed by a PulsarProducerFactory
.
We will see more details of these components later in this document, but in this section, let us look at the configuration properties available on the producer.
These properties must be prefixed with spring.pulsar.producer
.
The same defaults expected by the Pulsar producer are honored when no configuration provided for these properties.
spring.pulsar.producer.topicName
spring.pulsar.producer.producerName
spring.pulsar.producer.sendTimeoutMs
spring.pulsar.producer.blockIfQueueFull
spring.pulsar.producer.maxPendingMessages
spring.pulsar.producer.maxPendingMessagesAcrossPartitions
spring.pulsar.producer.messageRoutingMode
spring.pulsar.producer.hashingScheme
spring.pulsar.producer.cryptoFailureAction
spring.pulsar.producer.batchingMaxPublishDelayMicros
spring.pulsar.producer.batchingMaxMessages
spring.pulsar.producer.batchingEnabled
spring.pulsar.producer.chunkingEnabled
spring.pulsar.producer.compressionType
spring.pulsar.producer.initialSubscriptionName
spring.pulsar.producer.accessMode
When it comes to Pulsar consumer, we recommend the end user applications to make use of the PulsarListener
annotation.
In order to use PulsarListener
, you need to use the EnablePulsar
annotation.
When using the Spring Boot support, it automatically enables this annotation and configures all the components necessary for PulsarListener
such as the message listener infrastructure which is responsible for creating the Pulsar consumer.
PulsarMessageListenerContainer
uses a PulsarConsumerFactory
in order to create and manage the Pulsar consumer.
This consumer factory is auto-configured through Spring Boot.
Following are the consumer properties that you can configure through the Boot support.
The same defaults expected by the Pulsar consumer are honored when no configuration provided for these properties.
spring.pulsar.consumer.topicNames
spring.pulsar.consumer.topicsPattern
spring.pulsar.consumer.subscriptionName
spring.pulsar.consumer.subscriptionType
spring.pulsar.consumer.receiverQueueSize
spring.pulsar.consumer.acknowledgementsGroupTimeMicros
spring.pulsar.consumer.negativeAckRedeliveryDelayMicros
spring.pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions
spring.pulsar.consumer.consumerName
spring.pulsar.consumer.ackTimeoutMillis
spring.pulsar.consumer.tickDurationMillis
spring.pulsar.consumer.priorityLevel
spring.pulsar.consumer.cryptoFailureAction
spring.pulsar.consumer.properties
spring.pulsar.consumer.readCompacted
spring.pulsar.consumer.subscriptionInitialPosition
spring.pulsar.consumer.patternAutoDiscoveryPeriod
spring.pulsar.consumer.regexSubscriptionMode
spring.pulsar.consumer.autoUpdatePartitions
spring.pulsar.consumer.replicateSubscriptionState
spring.pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull
spring.pulsar.consumer.maxPendingChunkedMessageexpireTimeOfIncompleteChunkedMessageMillis
spring.pulsar.consumer.maxPendingChunkedMessageexpireTimeOfIncompleteChunkedMessageMillis
In this section, we will show a number of various applications to demonstrate how you can use the Spring for Apache Pulsar library using Spring Boot.
Here are the maven coordinates of the artifacts needed for using this library.
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar</artifactId>
<version>0.1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-boot-autoconfiguration</artifactId>
<version>0.1.0-SNAPSHOT</version>
</dependency>
If you simply include the spring-pulsar-boot-autoconfiguration
module, then that will transitively include the spring-pulsar
module.
In order to publish to a Pulsar topic, the easiest way is to use the PulsarTemplate
API.
Similarly, the easiest way to consume from a Pulsar topic is to use the PulsarListener
annotation.
The following is a complete Spring Boot application that publishes to a Pulsar topic called hello-pulsar
and then consumes from it.
@SpringBootApplication
public class PulsarBootHelloWorld {
public static void main(String[] args) {
SpringApplication.run(PulsarBootHelloWorld.class, args);
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar");
return args -> {
for (int i = 0; i < 10; i ++) {
pulsarTemplate.send("This is message " + (i + 1));
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
}
Although this is a very trivial application, it conveys several important concepts nonetheless. Let’s take a look at them.
PulsarTemplate
is autoconfigured by Spring Boot, and we are injecting that in the application.
There is a producer factory behind the scenes and properties can be provided to it using spring.pulsar.producer…
(See above for more details).
When injecting, note that, we can provide a specific type - String
in this case.
If we have a need for a different PulsarTemplate
with a different type in the same application, then we can add another injection with that type.
PulsarTemplate
currently requires the application to set a default topic.
Once the topic is set, then we can call the various send
methods on it.
In this example, we are calling the very basic send
method that calls the synchronous send from the Pulsar producer internally that returns the MessageId
.
We are ignoring the return value in this quick sample application.
PulsarTemplate
also provides API’s for sending asynchronously among other send methods.
We will look at the details of it, later on in the reference documentation.
We use the PulsarListener
annotation to consume from the topic.
We are providing the subscription and topic names as annotation arguments.
These are optional and the application may prefer to set them using the spring.pulsar.consumer..
properties provided above.
However, if properties are provided as annotation arguments, they get precedence.
There are four consumer level properties that you can set on the PulsarListener
annotation - they are topics
, topicPattern
, subscriptionName
and subscriptionType
.
When providing on the annotation, they get preference regardless of what is set through consumer properties.
By default, PulsarListener
uses an exclusive subscription type and this can be changed by using the subscriptionType
property on the annotation or through the consumer property.
PulsarListner
internally creates a message listener container.
The message listener container is responsible for creating the Pulsar Consumer using a consumer factory.
When the consumer receives data, the container calls a message listener.
This message listener is an adapter around the user provided method - the listen
method in this case.
The message listener container delegates to the adapter with the received data and the adapter then invokes the user method.
In order to complete the discussion about this sample application, we also need to chime in on how data conversion is done.
In the user method, we receive the data as String
, but we don’t specify any schema types.
Internally, the framework by default relies on Pulsar’s schema mechanism to convert the data to the required type.
The framework detects that you are expecting the String
type and then infer the schema type based on that information.
Then it provides that schema to the consumer.
For all the primitive types in Java, the framework does this inference.
For any complex types, such as JSON, AVRO etc. the framework cannot do this inference and the user needs to provide the schema type on the annotation using the schemaType
property.
In the example above, and in the ones that follows below, we provide both producer and consumer as part of the same application. However, in real apps, you might find it as convenient to separate them especially if you are following a microservice style architecture.
In the sample below, we are publishing to a topic called hello-pulsar-partitioned
.
It is a topic that is partitioned and for this sample we assume that the topic is already created with three partitions.
@SpringBootApplication
public class PulsarBootPartitioned {
public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.messageRoutingMode=CustomPartition");
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
static class FooRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}
static class BarRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}
static class BuzzRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}
}
A few things require explanation in the application above.
We are publishing to a partitioned topic and we would like to publish some data segment to a specific partition.
If you leave it to Pulsar’s default, it follows a round-robin mode of partition assignments, and we would like to override that.
In order to do that, we are providing a message router object with the send method.
Look at the three message routers implemented.
FooRouter
always sends data to partition 0
, BarRouter
to partition 1
and BuzzRouter
to partition 2
.
Also note that, we are now using the sendAsync
method of PulsarTemplate
that returns a CompletableFuture
.
When running the application, we also need to set the messageRoutingMode
on the producer to CustomPartition
(spring.pulsar.producer.messageRoutingMode
).
On the consumer side, we are using a PulsarListener
with the exclusive subscription type.
This means that data from all the partitions will end up in the same consumer and there is no ordering guarantee.
What can we do if we want each partition to be consumed by a single distinct consumer?
We can switch to the failover
subscription mode and add three separate consumers.
Here is an example.
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "failover")
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "failover")
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "failover")
public void liste3n(String foo) {
System.out.println("Message Received 3: " + foo);
}
When following this approach, you can see that a single partition always gets consumed by a dedicated consumer.
In the similar vein, if you want to use Pulsar’s shared consumer type, you can use the subscription type shared
.
Keep in mind though, that when using the shared
mode, you lose any ordering guarantees as a single consumer may receive messages from all the partitions before another consumer gets a chance.
Here is an example.
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "shared")
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = "shared")
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
In the examples above, we used PulsarListener
in record mode, i.e. we receive a single message on each method invocation.
Sometimes, for various reasons, we may want to receive Pulsar messages in batch mode. Here is a PulsarListener
that is enabled with batch consumption.
@PulsarListener(subscriptionName = "batch-subscription", topics = "hello-pulsar", batch = "true")
public void listen(List<String> records) {
System.out.println("Message Received from batch: " + records);
}
In order to enable batch consumption, you need to set the batch
property to true
on PulsarListener
.
Then you can receive the records as a List
type.
Based on the actual type that the List
holds, the framework tries to infer the schema to use.
If the List
contains a complex type, then the schemaType
still needs to be provided on PulsarListener
.
In your PulsarListener
method, you can receive the record directly as a Pulsar Message instead of the actual payload type.
Here is an example.
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println("Data Received: " + message.getValue());
}
When consuming messages in batch mode using PulsarListener
, instead of receiving them as a `List, you can receive them as Pulsar Messages type.
Here is an example.
@PulsarListener(subscriptionName = "batch-subscription", topics = "hello-pulsar", batch = "true")
public void listen(org.apache.pulsar.client.api.Messages<String> messages) {
// Iterate on the messages
// Each iteration gives access to a org.apache.pulsar.client.api.Message object
}
Sometimes, it is necessary to gain direct access to the Pulsar Consumer object. Here is how you may do so.
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
When accessing the Consumer
object this way, make sure NOT to invoke any operations that would change the Consumer’s cursor position by invoking any receive methods.
All such operations must be done by the container.
As indicated above, for normal Java types (the primitive ones), Spring Pulsar framework can infer the proper Schema to use on the PulsarListener
.
However, for more complex types such as JSON or AVRO, you need to specify the schema type on the annotation.
Here is how you provide that.
@PulsarListener(subscriptionName = "json-subscription", topics = "hello-pulsar-json", schemaType = SchemaType.JSON)
public void listen(Foo foo) {
System.out.println("Message received: " + foo);
}
On the producer side also, for the Java primitive types, the framework can infer the Schema, but for any other types, you need set that on the PulsarTemmplate
.