This project is under active development.
Table of Contents generated with DocToc
- Getting Started
- Dynamic Spout Framework
- Sidelining
- Contributing
- Other Notes
To use this library add the following to your pom.xml:
<dependency>
<groupId>com.salesforce.storm</groupId>
<artifactId>dynamic-spout</artifactId>
<version>0.10-SNAPSHOT</version>
</dependency>
Note that snapshot releases may have API changes, it's best to check for the latest release version and use that.
In addition to the library we publish source jars and javadocs to maven central. You can find those here.
This project uses recent versions of Apache Storm and Apache Kafka releases, but this library should function fine using any Storm 1.0 release or higher and any Kafka release 0.10 or higher. If you run into compatibility problems please let us know by filing an issue on GitHub.
This library contains a reusable set of components that can be used to build a system of spouts masked behind a single spout for Apache Storm. The library came out of the development of Apache Kafka Sidelining. During it's development it became clear that many of the components could serve as building blocks for a more general purpose framework. Not long after separating out those components did we put them to use on other stream processing applications.
The DynamicSpout
is a container of many DelegateSpout
instances, which each handle processing messages from their defined Consumer
and pass them into Apache Storm as a single stream. You can think of a DelegateSpout
as a typical Storm spout, with the DynamicSpout
being an orchestration level. The DynamicSpout
conforms to Apache Storm's IRichSpout
interface, and so from Storm's standpoint there is only one Spout, and that's the magic of the framework.
The DynamicSpout
framework has the following dependencies:
- Apache Storm: For all of the Storm interfaces this project is implemented against. This dependency is marked as provided so you will need to specify the version of Storm that you want to use in your project some where on your classpath.
- Apache Zookeeper: The framework is agnostic to where metadata is stored, and you can easily add different types of storage by implementing a
PersistenceAdapter
. The framework ships with a Zookeeper implementation because Storm itself requires Zookeeper to track metadata and we figured it's the easiest to provide out of the box. That said, Zookeeper is listed as an optional dependency and so you will need to include this in your classpath as well. - Apache Curator: As noted above, the framework ships with a Zookeeper
PersistenceAdapter
and we use Curator to make all of those interactions easy and painless. Curator is also an optional dependency, so if you're using Zookeeper you will need to specify the correct version of Curator to go with your Zookeeper dependency. - Apache Kafka: The framework is not tied to Kafka, but we've found that many of our implementations of the framework are. This is an optional dependency that you will want to include if you're using the Kafka based
Consumer
for example.
When a Storm topology is deployed with a DynamicSpout
, the DynamicSpout
will first start the SpoutCoordinator
. The SpoutCoordinator
will watch for DelegateSpout
instances that are added to it. A DelegateSpout
is just an interface that resembles Storm's IRichSpout
, but it's not meant to be run on it's own, only in the context of the framework. There is a default implementation of the DelegateSpout
called the VirtualSpout
that handles the mechanics of working with a Consumer
like the one included for talking to Kafka.
You may be wondering how a DelegateSpout
instance gets added to the SpoutCoordinator
and the answer is usually by a SpoutHandler
instance that is configured on the DynamicSpout
. A SpoutHandler
is a set of hooks into the DynamicSpout
life cycle. It allows you to do work when a DynamicSpout
instance opens or closes (the topology's start/stop lifecycle). For example, a SpoutHandler
may check a node in Zookeeper for data and create VirtualSpout
instances based upon that data, which is similar to how the sidelining implement works (more on that later).
There is also a VirtualSpoutHandler
that is tied to the VirtualSpout
implementation of the DelegateSpout
. Similarly it provides hooks into the life cycle of an individual VirtualSpout
instance inside of the DynamicSpout
.
Beyond the basic building blocks there are a number of places where you can change the behavior of the DynamicSpout
to meet your needs. They include changing the spout's retry behavior, adding filters for messages and customizing where the spout stores metadata, among other things. Out of the box the framework tries to provide reasonable defaults to get a project going, but as your project matures you may want to customize various aspects of the framework.
Below are some of the interfaces and their default implementations that you can use to customize the behavior of the DynamicSpout
. Most implementation don't need to touch the vast majority of these, however there are a few such as the Deserializer
and SpoutHandler
that you will likely need to create project-specific implementations.
DefaultSpout: This is the core building block of the DynamicSpout
framework, implementations of this interface are what handle passing data down into a Storm topology. The default implementation of this is the
VirtualSpout
which uses a Consumer
model to consume messages. Most of the time the VirtualSpout
is adequate enough for most implementations of the framework.
Consumer: A Consumer
is used by a VirtualSpout
instance to consume from a data source. The Kafka based Consumer
is an example implementation that polls for messages from a Kafka topic.
Deserializer: When using a Consumer
it will need to know how to transform a message into a usable format. The framework ships with a Utf8StringDeserializer
that merely ensures the bytes from a message are accessible as a String
. If you're using structured messages like Protocol Buffers, Apache Avro or even JSON you'll want to use a different deserializer. This is very similar to Storm's Scheme
. If you're using anything other than plain text on the messages your Consumer
is consuming from you'll probably be customizing this implementation.
FilterChainStep: A VirtualSpout
supports applying filter to messages that come from the Consumer
. This allows you to filter out messages at the spout level, before they ever get to your topology. Each filter is a step in a chain, so you can as many different filters as you want. This is a particularly powerful tool inside of the SpoutHandler
and VirtualSpoutHandler
where you can dynamically filter messages based upon how you're using the framework. These are completely optional and not required to use this framework.
PersistenceAdapter: Storm spout's need a place to store metadata, like the position a Consumer
is on a data source. Storm cluster's have a Zookeeper instance with them so the framework comes with a ZookeeperPersistenceAdapter
out of the box for storing this data. A Consumer
such as the Kafka one provided with the framework would use this layer to track it's position on a topic's partition as it consumes data.
RetryManager: When a tuple fails in Storm the spout can attempt to retry it. Different projects will have different retry needs, so the default implementation, ExponentialBackoffRetryManager
, mirrors the one in Storm itself by retrying a tuple with an exponential backoff. There are other implementations provided with the framework and if none of them support your needs you can implement your own. This is a more advanced customization of the framework that most projects will not need.
MessageBuffer: Because implementations fo the framework have many DelegateSpout
instances emitting tuples downstream the DynamicSpout
has a buffer that brings them altogether before passing them down to the topology. Out of the box the buffer is the RoundRobinBuffer
implementation, which loops through each virtual spout when it emits messages from the spout. Other implementations included in the framework are a first in, first out (FIFO) buffer as well as throttled and ratio based buffers that can emit tuples from different DelegateSpouts
at variable rates. This is a more advanced customization that most projects will not need.
MetricsRecorder: There are a ton of metrics exposed by the DynamicSpout
and you can customize how they are handled. By default the spout simply publishes metrics to logs using the LogRecorder
. If you are using an older version of Storm (pre 1.2) you can use StormRecorder
, or if you're using a more recent version of Storm checkout DropwizardRecorder
. All of the metrics captured by the DynamicSpout
are listed in a section below. Additionally there is a section about how to create your own metrics in your customizations of the framework.
SpoutHandler: This is essential a set of hooks that can tie into the lifecycle events of a DynamicSpout
, such as opening and closing of the DynamicSpout
. Implementations of this interface can orchestrate the creation of DelegateSpout
instances, and thus this is the primary integration point for people using the framework. While most every component provided with the framework has a default implementation, this one does not. That's because we expect this to be the point in the framework where your business logic for managing many spouts is to be built.
VirtualSpoutHandler: This is an optional set of hooks that can tie into the lifecycle events of a VirtualSpout
. Note that an implementation of DelegateSpout
does not necessarily support these, they are particular to the VirtualSpout
or an implementation that explicitly supports them. Similar to the SpoutHandler
, implementations of the VirtualSpoutHandler
allow for tieing into the lifecycle events of a VirtualSpout
, such as as opening and closing of the VirtualSpout
.
All of these options can be found inside of SpoutConfig
and KafkaConsumerConfig
. They are defined using the ConfigDocumentation
on keys which are defined in their respective configuration classes. The DocGenerator
then compiles them together below.
Config Key | Type | Required | Description | Default Value |
---|---|---|---|---|
spout.consumer.class | String | Defines which Consumer implementation to use. Should be a full classpath to a class that implements the Consumer interface. | com.salesforce.storm.spout.dynamic.kafka.Consumer | |
spout.coordinator.consumer_state_flush_interval_ms | Long | How often we'll make sure each VirtualSpout persists its state, in Milliseconds. | 30000 | |
spout.coordinator.max_concurrent_virtual_spouts | Integer | The size of the thread pool for running virtual spouts. | 10 | |
spout.coordinator.max_spout_shutdown_time_ms | Long | How long we'll wait for all VirtualSpout's to cleanly shut down, before we stop them with force, in Milliseconds. | 10000 | |
spout.coordinator.monitor_thread_interval_ms | Long | How often our monitor thread will run and watch over its managed virtual spout instances, in milliseconds. | 2000 | |
spout.coordinator.tuple_buffer.class | String | Defines which MessageBuffer implementation to use. Should be a full classpath to a class that implements the MessageBuffer interface. | com.salesforce.storm.spout.dynamic.buffer.RoundRobinBuffer | |
spout.coordinator.tuple_buffer.max_size | Integer | Defines maximum size of the tuple buffer. After the buffer reaches this size the internal VirtualSpouts will be blocked from generating additional tuples until they have been emitted into the topology. | 2000 | |
spout.coordinator.virtual_spout_id_prefix | String | Defines a VirtualSpoutId prefix to use for all VirtualSpouts created by the spout. This must be unique to your spout instance, and must not change between deploys. | ||
spout.metrics.class | String | Defines which MetricsRecorder implementation to use. Should be a full classpath to a class that implements the MetricsRecorder interface. | com.salesforce.storm.spout.dynamic.metrics.LogRecorder | |
spout.metrics.enable_task_id_prefix | Boolean | Defines if MetricsRecorder instance should include the taskId in the metric key. | ||
spout.metrics.time_bucket | Integer | Defines the time bucket to group metrics together under. | ||
spout.output_fields | List | Defines the output fields that the spout will emit as a list of field names. | ||
spout.output_stream_id | String | Defines the name of the output stream tuples will be emitted out of. | default | |
spout.permanently_failed_output_stream_id | String | Defines the name of the output stream tuples that have permanently failed be emitted out of. | failed | |
spout.retry_manager.class | String | Required | Defines which RetryManager implementation to use. Should be a full classpath to a class that implements the RetryManager interface. | com.salesforce.storm.spout.dynamic.retry.ExponentialBackoffRetryManager |
spout.retry_manager.delay_multiplier | Double | Defines how quickly the delay increases after each failed tuple. Example: A value of 2.0 means the delay between retries doubles. eg. 4, 8, 16 seconds, etc. | ||
spout.retry_manager.initial_delay_ms | Long | Defines how long to wait before retry attempts are made on failed tuples, in milliseconds. Each retry attempt will wait for (number_of_times_message_has_failed * min_retry_time_ms). Example: If a tuple fails 5 times, and the min retry time is set to 1000, it will wait at least (5 * 1000) milliseconds before the next retry attempt. | 1000 | |
spout.retry_manager.retry_delay_max_ms | Long | Defines an upper bound of the max delay time between retried a failed tuple. | ||
spout.retry_manager.retry_limit | Integer | Defines how many times a failed message will be replayed before just being acked. A negative value means tuples will be retried forever. A value of 0 means tuples will never be retried. A positive value means tuples will be retried up to this limit, then dropped. | 25 | |
spout.spout_handler_class | String | Defines which SpoutHandler implementation to use. Should be a fully qualified class path that implements the SpoutHandler interface. | com.salesforce.storm.spout.dynamic.handler.NoopSpoutHandler | |
spout.virtual_spout_factory_class | String | Defines which DelegateSpoutFactory implementation to use. Should be a fully qualified class path that implements the DelegateSpoutFactory interface. | class com.salesforce.storm.spout.dynamic.VirtualSpoutFactory | |
spout.virtual_spout_handler_class | String | Defines which VirtualSpoutHandler implementation to use. Should be a fully qualified class path that implements the VirtualSpoutHandler interface. | com.salesforce.storm.spout.dynamic.handler.NoopVirtualSpoutHandler |
Config Key | Type | Required | Description | Default Value |
---|---|---|---|---|
spout.persistence_adapter.class | String | Required | Defines which PersistenceAdapter implementation to use. Should be a full classpath to a class that implements the PersistenceAdapter interface. |
Config Key | Type | Required | Description | Default Value |
---|---|---|---|---|
spout.persistence.zookeeper.connection_timeout | Integer | Zookeeper connection timeout. | 6000 | |
spout.persistence.zookeeper.retry_attempts | Integer | Zookeeper retry attempts. | 10 | |
spout.persistence.zookeeper.retry_interval | Integer | Zookeeper retry interval. | 10 | |
spout.persistence.zookeeper.root | String | Defines the root path to persist state under. Example: "/consumer-state" | ||
spout.persistence.zookeeper.servers | List | Holds a list of Zookeeper server Hostnames + Ports in the following format: ["zkhost1:2181", "zkhost2:2181", ...] | ||
spout.persistence.zookeeper.session_timeout | Integer | Zookeeper session timeout. | 6000 |
Config Key | Type | Required | Description | Default Value |
---|---|---|---|---|
spout.coordinator.virtual_spout_id_prefix | String | Defines a consumerId prefix to use for all consumers created by the spout. This must be unique to your spout instance, and must not change between deploys. | ||
spout.kafka.brokers | List | Holds a list of Kafka Broker hostnames + ports in the following format: ["broker1:9092", "broker2:9092", ...] | ||
spout.kafka.deserializer.class | String | Defines which Deserializer (Schema?) implementation to use. Should be a full classpath to a class that implements the Deserializer interface. | ||
spout.kafka.topic | String | Defines which Kafka topic we will consume messages from. |
Handlers essentially hooks that are attached to either the DynamicSpout
and VirtualSpout
and provide a way for interacting with their lifecycle stages without having to extend a base class. They serve as the key manner in which one might implement the framework in their project.
The SpoutHandler
is an interface which allows you to tie into the DynamicSpout
lifecycle. Without a class implementing this interface the DynamicSpout
in and of itself does not offer much value, as the DynamicSpout
does not by itself know how to create VirtualSpout
instances. Your SpoutHandler
implementation will be responsible for creating VirtualSpout
's and passing them back to the DynamicSpout
's coordinator.
There are several methods on the SpoutHandler
you can implement. There are no-op defaults provided so you do not have to implement any of them, but there are four in particular we're going to go into detail because they are critical for most SpoutHandler
implementations.
void open(Map<String, Object> spoutConfig)
- This method is functionally like theSpoutHandler
's constructor, it is called to setup theSpoutHandler
. Just asopen()
exists on theISpout
interface in Storm to setup your spout, so this method is intended for setting up yourSpoutHandler
.void close()
- This method is similar to anISpout
'sclose()
method, it gets called when theSpoutHandler
is torn down, and you can use it shut down any classes that you have used in theSpoutHandler
as well as clean up any object references you have.void onSpoutOpen(DynamicSpout spout, Map topologyConfig, TopologyContext topologyContext)
- This method is called after theDynamicSpout
is opened, and with it you get theDynamicSpout
instance to interact with. It's here that you can do things like callDynamicSpout.addVirtualSpout(VirtualSpout virtualSpout)
to add a newVirtualSpout
instance into theDynamicSpout
.void onSpoutClose(DynamicSpout spout)
- This method is called afterDynamicSpout
is closed, you can use it to perform shut down tasks when theDynamicSpout
itself is closing, and with it you get theDynamicSpout
instance to interact with.void onSpoutActivate(DynamicSpout spout)
- This method is calledDynamicSpout
has been activated.void onSpoutDectivate(DynamicSpout spout)
- This method is calledDynamicSpout
has been deactivated.
It's important to note that SpoutHandler
instance methods should be blocking since they are part of the startup and shutdown flow. Only perform asynchronous tasks if you are certain that other spout methods can be called without depending on your asynchronous tasks to complete.
Here is a sample SpoutHandler
that can be used in conjunction with the Kafka Consumer
to read a Kafka topic:
package com.salesforce.storm.spout.dynamic.example;
import com.salesforce.storm.spout.dynamic.DynamicSpout;
import com.salesforce.storm.spout.dynamic.DefaultVirtualSpoutIdentifier;
import com.salesforce.storm.spout.dynamic.VirtualSpout;
import com.salesforce.storm.spout.dynamic.consumer.ConsumerPeerContext;
import com.salesforce.storm.spout.dynamic.handler.SpoutHandler;
import org.apache.storm.task.TopologyContext;
import java.util.Map;
public class SimpleKafkaSpoutHandler implements SpoutHandler {
@Override
public void onSpoutOpen(DynamicSpout spout, Map topologyConfig, TopologyContext topologyContext) {
// Create our main VirtualSpout that will consume off Kafka (note, you must have the Kafka Consumer configured)
spout.addVirtualSpout(
new VirtualSpout(
// Unique identifier for this spout
new DefaultVirtualSpoutIdentifier("kafkaSpout"),
spout.getSpoutConfig(),
// Tells the consumer of a VirtualSpout how many spout instances there are and which one this is
new ConsumerPeerContext(1, 0),
spout.getFactoryManager(),
null, // Optional Starting ConsumerState
null // Optional Ending ConsumerState
)
);
}
}
The VirtualSpoutHandler is an interface which allows you to tie into the VirtualSpout
lifecycle. An implementation is not required to use the dynamic spout framework, but it can be helpful when your implementation requires you to tap into the lifecycle of each individual spout being managed by the DynamicSpout
.
There are several methods on the VirtualSpoutHandler
you can implement. There are no-op defaults provided so you do not have to implement any of them, but there are five in particular we're going to go into detail because they are critical for most VirtualSpoutHandler
implementations.
void open(Map<String, Object> spoutConfig)
- This method is functionally like theVirtualSpoutHandler
's constructor, it is called to setup theVirtualSpoutHandler
. Just asopen()
exists in theISpout
interface in Storm to setup your spout, so this method is intended for setting up yourVirtualSpoutHandler
.void close()
- This method is similar to anISpout
'sclose()
method, it gets called when theVirtualSpoutHandler
is torn down, and you can use it shut down any classes that you have used in theVirtualSpoutHandler
as well as clean up any object references you have.void onVirtualSpoutOpen(DelegateSpout virtualSpout)
- This method is called after theVirtualSpout
is opened, and with it you get theVirtualSpout
instance to interact with.void onVirtualSpoutClose(DelegateSpout virtualSpout)
- This method is called after theVirtualSpout
is closed, and with it you get theVirtualSpout
instance to interact with.void onVirtualSpoutCompletion(DelegateSpout virtualSpout)
- This method is called beforeonVirtualSpoutClose()
only when the VirtualSpout instance is about to close and has completed it's work, meaning that the Consumer has reached the provided ending offset.
Below is a list of metrics that are collected with the metric type and description.
Key | Type | Unit | Description |
---|---|---|---|
SpoutCoordinator.bufferSize | GAUGE | Number | Size of internal MessageBuffer. |
SpoutCoordinator.completed | GAUGE | Number | The number of completed VirtualSpout instances. |
SpoutCoordinator.errored | GAUGE | Number | The number of errored VirtualSpout instances. |
SpoutCoordinator.poolSize | GAUGE | Number | The max number of VirtualSpout instances that will be run concurrently. |
SpoutCoordinator.queued | GAUGE | Number | The number of queued VirtualSpout instances. |
SpoutCoordinator.running | GAUGE | Number | The number of running VirtualSpout instances. |
VirtualSpout.{virtualSpoutIdentifier}.ack | COUNTER | Number | Tuple ack count per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.emit | COUNTER | Number | Tuple emit count per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.exceededRetryLimit | COUNTER | Number | Messages who have exceeded the maximum configured retry count per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.fail | COUNTER | Number | Tuple fail count per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.filtered | COUNTER | Number | Filtered messages per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.numberFiltersApplied | GAUGE | Number | How many Filters are being applied against the VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.currentOffset | GAUGE | Number | The offset currently being processed for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.endingOffset | GAUGE | Number | The ending offset for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.percentComplete | GAUGE | Number | Percentage of messages processed out of the total for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.startingOffset | GAUGE | Percent 0.0 to 1.0 | The starting offset position for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.totalMessages | GAUGE | Number | Total number of messages to be processed by the VirtualSpout for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.totalProcessed | GAUGE | Number | Number of messages processed by the VirtualSpout instance for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.totalUnprocessed | GAUGE | Number | Number of messages remaining to be processed by the VirtualSpout instance for the given partition. |
Key | Type | Unit | Description |
---|---|---|---|
KafkaConsumer.topic.{topic}.partition.{partition}.currentOffset | GAUGE | Number | Offset consumer has processed. |
KafkaConsumer.topic.{topic}.partition.{partition}.endOffset | GAUGE | Number | Offset for TAIL position in the partition. |
KafkaConsumer.topic.{topic}.partition.{partition}.lag | GAUGE | Number | Difference between endOffset and currentOffset metrics. |
Metrics provide several ways of numerical tracking data. These are very similar to the sort of stats you would use with Statd. The metric (or stat) types that are supported are:
Type | Format | Description |
---|---|---|
Averages | AVERAGES.<className>.<metricName> | Calculates average of all values submitted over a set time period. |
Counter | COUNTERS.<className>.<metricName> | Keeps a running count that gets reset back to zero on deployment. |
Gauge | GAUGES.<className>.<metricName> | Reports the last value given for the metric. |
Timer | TIMERS.<className>.<metricName> | Calculates how long on average, in milliseconds, an event takes. These metrics also publish a related counter |
These can be handled through methods on an implementation of MetricsRecorder
.
To track a specific metric you'll need to create a definition of it, using an implementation of MetricDefinition
. A MetricDefinition
is pretty open ended, it just provides a consistent way from generating a String (called a key) to identify the metric. The framework itself uses ClassMetric
for it's metrics, however
CustomMetric
is also provided. A ClassMetric
is specifically tied to a Java class and that classes name will be used to generate the key for the metric. This is a handy way of cataloging metrics, but you're free to do whatever is best for your project. All of the framework's core metrics are properties on SpoutMetrics
which is a handy, but not required, way of organizing your metrics.
Here's an example metric from SpoutMetrics
in the framework:
public static final MetricDefinition SPOUT_COORDINATOR_BUFFER_SIZE = new ClassMetric(SpoutCoordinator.class, "bufferSize");
This metric is then easily used by the MetricsRecorder
like so:
getMetricsRecorder().assignValue(SpoutMetrics.SPOUT_COORDINATOR_BUFFER_SIZE, getVirtualSpoutMessageBus().messageSize());
You can add new metrics to any custom implementation in the framework as needed. Most interfaces have an open()
method that will receive a MetricsRecorder
as a parameter. If they don't, then this is a great opportunity for a contribution!
Lastly you'll note that all of our metric keys are annotated with MetricDocumentation
, this is purely a convention of the framework which helps update the table of metrics below. If you're interested in how this is done, or want to do something similar check out the DocGenerator
which compiles them together.
The purpose of the sidelining project is to provide a way to skip processing for a subset of messages on a Kafka topic while allowing for them to be processed at a later time. The canonical use case for this project is a multi-tenant Kafka topic, where you want to pause processing for a single tenant for some period of time without pausing other tenants. This project is built upon the DynamicSpout
framework as a set of SpoutHandler
and VirtualSpoutHandler
implementations that apply custom FilterChainStep
instances when something needs to be sidelined and then removes them when a sideline should resume or resolve.
The project is wrapped in a spout implementation called SidelineSpout
which extends the DynamicSpout
and configures the handlers up in an oppinionated way. It is intended to works much like a typical KafkaSpout.
When consuming a multi-tenant commit log you may want to postpone processing for one or more tenants. Imagine that a subset of your tenants database infrastructure requires downtime for maintenance. Using the KafkaSpout
implementation that comes with Storm you really only have two options to deal with this situation:
-
Stop your entire topology for all tenants while the maintenance is performed for the small subset of tenants.
-
Filter the problematic tenants out at the beginning of your topology to avoid processing further downstream, then once maintenance is complete you can stop filtering the problematic tenants. If you're ambitious you can keep track of where you started and stopped filtering and then spin up a separate
KafkaSpout
instance to process only the problematic tenant between the offsets that marked applying and removing your filter.
If you can afford downtime, than option 1 is an easy way to deal with the problem. If not then option 2 might be something you have to look at. The whole situation becomes even more complicated if you have more than one problematic tenant at a time, and even more so still if they need to start and/or stop filtering at different times. That's a lot of complexity! Sidelining actually aims to handle option 2 for you, but through automation and easy control semantics.
Imagine wrapping up all of that complexity behind a spout so that the rest of your storm topology doesn't have to think about that problem space: that's sidelining!
All of the optional dependencies of the DynamicSpout
framework are required, namely Apache Zookeeper, Apache Curator and Apache Kafka. Check out the DynamicSpout
dependencies section for more information. These all need to be specified in your
A sideline is just a filter applied at a specific point in time, with a promise to come back to it at a later point. A given sideline has a lifecycle, it is started, resumed and finally resolved. Sidelines are always derived from a special VirtualSpout
called the fire hose. The fire hose is your main Kafka topic, and if everything is going well and you don't need to sideline anything you're consuming all of the messages from it in real time. A sideline starts as a filter on the fire hose and evolves from there.
An implementation of sidelining has what's called a SidelineTrigger
which gets called when the SidelineSpoutHandler
first opens. The SidelineTrigger
is responsible for setting up whatever is necessary to notify the SidelineController
to start, resume or resolve a sideline. The sideline spout does not have an opinion on what you should use to notify that a sideline lifecycle event should take place, however a usable recipe using Zookeeper watches is provided if you would like to use it (hint: the authors use it in production today).
When you sideline you are telling the spout to skip messages (for now) based upon some sort of criteria. This is done by defining a FilterChainStep
, a construct from the DynamicSpout
. A FilterChainStep
has a single method called filter()
which receives a Message
and return true or false based upon whether or not the supplied message should be skipped. If you go back to our example use case, you may create a real simple FilterChainStep
that takes a tenant's identifier in it's constructor and then checks a property on a Message
to see if that message belongs to the provided tenant.
When a SidelineTrigger
receives a signal to start a sideline it comes in the form of a SidelineRequest
which is really just a wrapper for a FilterChainStep
, which is a primitive of the DynamicSpout
framework. The FilterChainStep
is a really simple interface that defines a filter()
method. You will need to define a FilterChainStep
implementation for your use case. Your SidelineTrigger
than will send your FilterChainStep
to the SidelineController
which will promptly apply the filter to the fire hose and mark the starting offset on the Kafka topic for where the filter was applied. Once a sideline has been started you should no longer be receiving messages that would match your filter.
When a SidelineTrigger
receives a signal to resume a sideline it must receive an identifier for a started sideline. That means after you start a sideline you should hang onto the id it provides somewhere for later reference (again if you take a look at the provided recipes you'll see how we do this in Zookeeper). Resuming a sideline does not change the fire hose, so messages will continue to be filtered there. Resuming does, however, spin up a new VirtualSpout
that will consume from the same topic as the fire hose, but it's going to do the inverse of your filter and it's going to start from the point the sideline was started (in the past). This means you will have the same Kafka topic being processed in parallel while a sideline is resuming, it's just that one VirtualSpout
is filtering out specific messages (the fire hose) and the other VirtualSpout
is only processing those filtered messages (the sideline). This is a good use case of the ThrottledMessageBuffer
or RatioMessageBuffer
because it can allow your for your sideline to emit messages at a different rate back into the topology.
When you're ready to be done sidelining you can resolve it, which means that an endpoint is marked on the sideline. Remember that up until now all we had was a starting point for the sideline, we had no defined a point on the Kafka topic to stop sidelining. Resolving a sideline does that, which means that the VirtualSpout
for the sideline will eventually finish the work it needs to do. Once it's finished the VirtualSpout
will be closed and the instance is cleaned up. At the same time that the end offset is recorded the filter that was on the fire hose VirtualSpout
will be removed, meaning that the fire hose will now be processing all of the messages on the Kafka topic.
The DynamicSpout
has several moving pieces, all of which will properly handle resuming in the state that they were when the topology was halted. The fire hose VirtualSpout
will resume consuming from its last acked offset, and any active sideline requests will also be resumed when the topology starts back up. However you choose to implement sidelining it is important to have some sort of durable storage for requests. The recipes include a Zookeeper based trigger that allows for sideline requests to come in while a topology is down without losing them.
Config Key | Type | Required | Description | Default Value |
---|---|---|---|---|
sideline.filter_chain_step_class | String | Defines a filter chain step that is used during sidelining. Should be a fully qualified class path that implements thee FilterChainStep interface. | ||
sideline.persistence.zookeeper.connection_timeout | Integer | Zookeeper connection timeout. | 6000 | |
sideline.persistence.zookeeper.retry_attempts | Integer | Zookeeper retry attempts. | 10 | |
sideline.persistence.zookeeper.retry_interval | Integer | Zookeeper retry interval. | 10 | |
sideline.persistence.zookeeper.root | String | Defines the root path to persist state under. Example: "/consumer-state" | ||
sideline.persistence.zookeeper.servers | List | Holds a list of Zookeeper server Hostnames + Ports in the following format: ["zkhost1:2181", "zkhost2:2181", ...] | ||
sideline.persistence.zookeeper.session_timeout | Integer | Zookeeper session timeout. | 6000 | |
sideline.persistence_adapter.class | String | Required | Defines which PersistenceAdapter implementation to use. Should be a full classpath to a class that implements the PersistenceAdapter interface. | |
sideline.refresh_interval_seconds | Integer | Interval (in seconds) to check running sidelines and refresh them if necessary. | 600 | |
sideline.trigger_class | String | Defines a sideline trigger (if any) to use. Should be a fully qualified class path that implements the SidelineTrigger interface. |
Below is a list of metrics that are collected with the metric type and description.
Key | Type | Unit | Description |
---|---|---|---|
SidelineSpoutHandler.resolve | COUNTER | Number | Total number of resolving sidelines. |
SidelineSpoutHandler.resume | COUNTER | Number | Total number of resumed sidelines. |
SidelineSpoutHandler.start | COUNTER | Number | Total number of started sidelines. |
Each project leveraging the SidelineSpout
will likely have a unique set of triggers representing your specific use case. The following is a stripped down example intended to convey the concept of what a SidelineTrigger
does. It's not a practical implementation and it's highly recommended that you refer to recipes package which includes a fully functional and production ready SidelineTrigger
.
First we need a filter, so we'll use the NumberFilter
which expects messages whose first value is an integer and if that value matches, it is filtered. Keep in mind your filter will be serialized to JSON and stored in Zookeeper (unless your change your PersistenceAdapter
), so take great care to ensure that what you do serializes correctly.
package com.salesforce.storm.spout.sideline.example;
import com.salesforce.storm.spout.dynamic.filter.FilterChainStep;
import com.salesforce.storm.spout.dynamic.Message;
public static class NumberFilter implements FilterChainStep {
final private int number;
public NumberFilter(final int number) {
this.number = number;
}
public boolean filter(Message message) {
Integer messageNumber = (Integer) message.getValues().get(0);
// Filter them if they don't match, in other words "not" equals
return messageNumber.equals(number);
}
}
PollingSidelineTrigger
runs every 30 seconds and simply swaps out number filters, slowly incrementing over time. It uses the NumberFilter
by including it in a SidelineRequest
. PollingSidelineTrigger
implements SidelineTrigger
.
package com.salesforce.storm.spout.sideline.example;
import com.salesforce.storm.spout.sideline.trigger.SidelineTrigger;
import com.salesforce.storm.spout.sideline.trigger.SidelineRequest;
import com.salesforce.storm.spout.sideline.handler.SidelineController;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.Map;
import java.lang.Runnable;
public class PollingSidelineTrigger implements SidelineTrigger {
private boolean isOpen = false;
private transient ScheduledExecutorService executor;
private transient SidelineController sidelineController;
@Override
public void setSidelineController(final SidelineController sidelineController) {
this.sidelineController = sidelineController;
}
@Override
public void open(final Map config) {
if (isOpen) {
return;
}
isOpen = true;
executor = Executors.newScheduledThreadPool(1);
final Poll poll = new Poll(sidelineController);
executor.scheduleAtFixedRate(poll, 0, 30, TimeUnit.SECONDS);
}
@Override
public void close() {
if (!executor.isShutdown()) {
executor.shutdown();
}
}
@Override
public void setSidelineController(SidelineController sidelineController) {
this.sidelineController = sidelineController;
}
static class Poll implements Runnable {
private SidelineController sidelineController;
private Integer number = 0;
Poll(SidelineController sidelineController) {
this.sidelineController = sidelineController;
}
@Override
public void run() {
// Start a sideline request for the next number
final SidelineRequest startRequest = new SidelineRequest(
new NumberFilter(number++)
);
sidelineController.start(startRequest);
if (number < 2) {
return;
}
// Resume a sideline request for the last number
final SidelineRequest resumeRequest = new SidelineRequest(
new NumberFilter(number - 1)
);
sidelineController.resume(resumeRequest);
if (number < 3) {
return;
}
// Resolve a sideline request for two numbers back
final SidelineRequest resolveRequest = new SidelineRequest(
new NumberFilter(number - 2)
);
sidelineController.resume(resolveRequest);
}
}
}
The SidelineSpout
offers a lot of places to customize it's behavior, but we've also tried to provide sensible defaults that work out of the box. In the case of a SidelineTrigger
and the FilterChainStep
necessary to sideline, though, these are often very specific to your businesses use case. Included in this project is a recipes
package that includes a ZookeeperWatchTrigger
that is designed to use a TriggerEvent
to signal starting, stopping and resuming a sideline. The actual trigger watches for these events on a node in Zookeeper using the fantastic Curator PathChildrenCacheListener
. This implementation sees changes to the node and than calls our listener, allowing for the handling of sidelines. We chose this as a recipe to bundle because we were doing this in our production uses of sidelining already and had great success with them. As mentioned elsewhere, the beauty of Zookeeper is every Storm cluster has it, and so it makes it easy to provide a reasonable default implementation to use.
If you want to give the recipe a try there is a minimal amount of configuration necessary:
sideline.trigger_class: com.salesforce.storm.spout.sideline.recipes.trigger.zookeeper.ZookeeperWatchTrigger
## Most likely you will want to use your own custom filter here
sideline.filter_chain_step_class: com.salesforce.storm.spout.sideline.recipes.trigger.KeyFilter
## Zookeeper Watch Trigger configuration
sideline.zookeeper_watch_trigger.servers:
- 127.0.0.1:2181
sideline.zookeeper_watch_trigger.root: /sideline-trigger
The recipe also includes a class called the TriggerEventHelper
which has easy to use methods like startTriggerEvent()
that create the underlying events that make this trigger work.
You may be wondering why TriggerEvent
instances when we already have SidelineRequest
instances, and that's a good question! The TriggerEvent
provides additional operation data that, quite honestly, we don't care about in the actual sidelining process, like who performed this sideline action, what time it was done and what the reason for it was. For many use cases this is probably not necessary, but we found it helpful.
The other reason we chose to use yet another node in Zookeeper, a sort of staging ground for the sideline request, was because we wanted to allow for sidelines to occur when a topology was down. Good operational procedures should prevent humans from doing this, but we also recognize that sidelining might be handled by an automated process that may not have the awareness to tell if a topology is running or not.
All of this is to say, your use case might be far simpler then our own so buyer beware! But, this implementation is used today in production with great results.
Lastly, the TriggerEvent
is not tightly coupled to the ZookeeperWatchTrigger
and so we envision providing future recipes using different data stores and notification systems. Keep an eye out, and please feel free to contribute!
Found a bug? Think you've got an awesome feature you want to add? We welcome contributions!
- Search for an existing issue. If none exists, create a new issue so that other contributors can keep track of what you are trying to add/fix and offer suggestions (or let you know if there is already an effort in progress). Be sure to clearly state the problem you are trying to solve and an explanation of why you want to use the strategy you're proposing to solve it.
- Fork this repository on GitHub and create a branch for your feature.
- Clone your fork and branch to your local machine.
- Commit changes to your branch.
- Push your work up to GitHub.
- Submit a pull request so that we can review your changes.
Make sure that you rebase your branch off of master before opening a new pull request. We might also ask you to rebase it if master changes after you open your pull request.
We love contributions, but it's important that your pull request adhere to some of the standards we maintain in this repository.
- All tests must be passing!
- All code changes require tests!
- All code changes must be consistent with our checkstyle rules.
- New configuration options should have proper annotations and README updates generated.
- Great inline comments.
Stephen Powis and Stan Lemon began this project in early February 2017 for Salesforce. By February 22nd we had split our project into its own git repository and rolled it out internally for several teams. In September 2017 we open sourced the project and then in November 2017 we split off some of our testing code for Kafka into it's own project kafka-junit.
The idea of a dynamic spout framework was never something we set out to build. Sidelining existed before too, but in a much more complicated and high-resource fashion. We originally set out to re-design our sidelining implementation using a completely different strategy. In the course of things we realized there were a ton of reusable components in the sidelining project that might benefit from being decoupled, and so we started the process of surgically splitting the two. Out of this effort the dynamic spout framework was born, and it was almost serendipitous because we quickly had our first use case for managing many VirtualSpout
instances behind a single interface. That effort involved a bunch of different http connections (one VirtualSpout
for each) and while it looked nothing like sidelining it was a perfect fit for the dynamic spout framework.
Extensive test coverage is available for both the DynamicSpout
and SidelineSpout
projects. There are both unit and functional tests. Today the functional tests are not clearly divided between the two projects (which is why they haven't been split up yet). This is simply an artifact of the way that project evolved.
You can run the tests by doing:
mvn clean test
We use checkstyle aggressively on source and tests. We use a custom config that is located under the 'script' folder and can be imported into your IDE of choice.
You can run checkstyle using Maven:
mvn checkstyle:checkstyle
We have tried to keep our javadocs current and useful, you can generate api docs using Maven:
mvn javadoc:javadoc
The configuration section in this document is generated using the DocGenerator
, which automatically generates the appropriate tables in this file using the ConfigDocumentation
& MetricDocumentation
annotations. Do not update those tables manually as they will get overwritten, instead use the DocGenerator
, which can be executed easily each [DocTask
] class.
You can run these from the command line using Maven:
mvn exec:java -Dexec.mainClass="com.salesforce.storm.spout.dynamic.config.DocTask"
mvn exec:java -Dexec.mainClass="com.salesforce.storm.spout.dynamic.kafka.DocTask"
mvn exec:java -Dexec.mainClass="com.salesforce.storm.spout.sideline.config.DocTask"