Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Documentation re-arrangement #65

Merged
merged 16 commits into from
Nov 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 63 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ Using the default straight-out-of-the-box configuration, this spout has the foll
When your topology is deployed with a `DynamicSpout` and it starts up, the `DynamicSpout` will first start the `SpoutMonitor`. The `SpoutMonitor` will watch for `VirtualSpout` instances that are added to it, this is typically handled by a `SpoutHandler` instance that is configured on the `DynamicSpout`. Each `VirtualSpout` will create a `Consumer` that leverages a starting `ConsumerState` to begin it's work.

## Configuration
All of these options can be found inside of [SidelineSpoutConfig](src/main/java/com/salesforce/storm/spout/dynamic/config/SidelineSpoutConfig.java).

[//]: <> (CONFIGURATION_BEGIN_DELIMITER)
All of these options can be found inside of [SpoutConfig](src/main/java/com/salesforce/storm/spout/dynamic/config/SpoutConfig.java).

<!-- DYNAMIC_SPOUT_CONFIGURATION_BEGIN_DELIMITER -->
### Dynamic Spout Configuration Options
Config Key | Type | Required | Description | Default Value |
---------- | ---- | -------- | ----------- | ------------- |
spout.consumer.class | String | | Defines which Consumer implementation to use. Should be a full classpath to a class that implements the Consumer interface. | com.salesforce.storm.spout.dynamic.kafka.Consumer
Expand All @@ -121,7 +121,7 @@ spout.coordinator.max_concurrent_virtual_spouts | Integer | | The size of the t
spout.coordinator.max_spout_shutdown_time_ms | Long | | How long we'll wait for all VirtualSpout's to cleanly shut down, before we stop them with force, in Milliseconds. | 10000
spout.coordinator.monitor_thread_interval_ms | Long | | How often our monitor thread will run and watch over its managed virtual spout instances, in milliseconds. | 2000
spout.coordinator.tuple_buffer.class | String | | Defines which MessageBuffer implementation to use. Should be a full classpath to a class that implements the MessageBuffer interface. | com.salesforce.storm.spout.dynamic.buffer.RoundRobinBuffer
spout.coordinator.tuple_buffer.max_size | Integer | | Defines maximum size of the tuple buffer. After the buffer reaches this size the internal kafka consumers will be blocked from consuming. | 2000
spout.coordinator.tuple_buffer.max_size | Integer | | Defines maximum size of the tuple buffer. After the buffer reaches this size the internal VirtualSpouts will be blocked from generating additional tuples until they have been emitted into the topology. | 2000
spout.coordinator.virtual_spout_id_prefix | String | | Defines a VirtualSpoutId prefix to use for all VirtualSpouts created by the spout. This must be unique to your spout instance, and must not change between deploys. |
spout.metrics.class | String | | Defines which MetricsRecorder implementation to use. Should be a full classpath to a class that implements the MetricsRecorder interface. | com.salesforce.storm.spout.dynamic.metrics.LogRecorder
spout.metrics.enable_task_id_prefix | Boolean | | Defines if MetricsRecorder instance should include the taskId in the metric key. |
Expand All @@ -134,14 +134,15 @@ spout.retry_manager.initial_delay_ms | Long | | Defines how long to wait before
spout.retry_manager.retry_delay_max_ms | Long | | Defines an upper bound of the max delay time between retried a failed tuple. |
spout.retry_manager.retry_limit | Integer | | Defines how many times a failed message will be replayed before just being acked. A negative value means tuples will be retried forever. A value of 0 means tuples will never be retried. A positive value means tuples will be retried up to this limit, then dropped. | 25
spout.spout_handler_class | String | | Defines which SpoutHandler implementation to use. Should be a fully qualified class path that implements the SpoutHandler interface. | com.salesforce.storm.spout.dynamic.handler.NoopSpoutHandler
spout.virtual_spout_factory_class | String | | Defines which DelegateSpoutFactory implementation to use. Should be a fully qualified class path that implements the DelegateSpoutFactory interface. | class com.salesforce.storm.spout.dynamic.VirtualSpoutFactory
spout.virtual_spout_handler_class | String | | Defines which VirtualSpoutHandler implementation to use. Should be a fully qualified class path that implements the VirtualSpoutHandler interface. | com.salesforce.storm.spout.dynamic.handler.NoopVirtualSpoutHandler

### Persistence
### Persistence Configuration Options
Config Key | Type | Required | Description | Default Value |
---------- | ---- | -------- | ----------- | ------------- |
spout.persistence_adapter.class | String | Required | Defines which PersistenceAdapter implementation to use. Should be a full classpath to a class that implements the PersistenceAdapter interface. |

### Zookeeper Persistence
### Zookeeper Persistence Configuration Options
Config Key | Type | Required | Description | Default Value |
---------- | ---- | -------- | ----------- | ------------- |
spout.persistence.zookeeper.connection_timeout | Integer | | Zookeeper connection timeout. | 6000
Expand All @@ -151,32 +152,35 @@ spout.persistence.zookeeper.root | String | | Defines the root path to persist
spout.persistence.zookeeper.servers | List | | Holds a list of Zookeeper server Hostnames + Ports in the following format: ["zkhost1:2181", "zkhost2:2181", ...] |
spout.persistence.zookeeper.session_timeout | Integer | | Zookeeper session timeout. | 6000

<!-- DYNAMIC_SPOUT_CONFIGURATION_END_DELIMITER -->


### Kafka
<!-- KAFKA_CONSUMER_CONFIGURATION_BEGIN_DELIMITER -->
### Kafka Consumer Configuration Options
Config Key | Type | Required | Description | Default Value |
---------- | ---- | -------- | ----------- | ------------- |
spout.coordinator.virtual_spout_id_prefix | String | | Defines a consumerId prefix to use for all consumers created by the spout. This must be unique to your spout instance, and must not change between deploys. |
spout.kafka.brokers | List | | Holds a list of Kafka Broker hostnames + ports in the following format: ["broker1:9092", "broker2:9092", ...] |
spout.kafka.deserializer.class | String | | Defines which Deserializer (Schema?) implementation to use. Should be a full classpath to a class that implements the Deserializer interface. |
spout.kafka.topic | String | | Defines which Kafka topic we will consume messages from. |

<!-- KAFKA_CONSUMER_CONFIGURATION_END_DELIMITER -->


<!-- SIDELINE_CONFIGURATION_BEGIN_DELIMITER -->
### Sideline Configuration Options
Config Key | Type | Required | Description | Default Value |
---------- | ---- | -------- | ----------- | ------------- |
sideline.persistence.zookeeper.connection_timeout | Integer | | Zookeeper connection timeout. |
sideline.persistence.zookeeper.retry_attempts | Integer | | Zookeeper retry attempts. |
sideline.persistence.zookeeper.retry_interval | Integer | | Zookeeper retry interval. |
sideline.persistence.zookeeper.connection_timeout | Integer | | Zookeeper connection timeout. | 6000
sideline.persistence.zookeeper.retry_attempts | Integer | | Zookeeper retry attempts. | 10
sideline.persistence.zookeeper.retry_interval | Integer | | Zookeeper retry interval. | 10
sideline.persistence.zookeeper.root | String | | Defines the root path to persist state under. Example: "/consumer-state" |
sideline.persistence.zookeeper.servers | List | | Holds a list of Zookeeper server Hostnames + Ports in the following format: ["zkhost1:2181", "zkhost2:2181", ...] |
sideline.persistence.zookeeper.session_timeout | Integer | | Zookeeper session timeout. |
sideline.persistence.zookeeper.session_timeout | Integer | | Zookeeper session timeout. | 6000
sideline.persistence_adapter.class | String | Required | Defines which PersistenceAdapter implementation to use. Should be a full classpath to a class that implements the PersistenceAdapter interface. |
sideline.refresh_interval_seconds | Integer | | Interval (in seconds) to check running sidelines and refresh them if necessary. |
sideline.refresh_interval_seconds | Integer | | Interval (in seconds) to check running sidelines and refresh them if necessary. | 600
sideline.trigger_class | String | | Defines one or more sideline trigger(s) (if any) to use. Should be a fully qualified class path that implements thee SidelineTrigger interface. |

<!-- SIDELINE_CONFIGURATION_END_DELIMITER -->

[//]: <> (CONFIGURATION_END_DELIMITER)

## Components
[DynamicSpout](src/main/java/com/salesforce/storm/spout/dynamic/DynamicSpout.java) - Implements Storm's spout interface. Everything starts and stops here.
Expand Down Expand Up @@ -318,10 +322,50 @@ Timer | Calculates how long on average, in milliseconds, an event takes. These

Below is a list of metrics that are collected with the metric type and description.

Class | Key | Type | Description
------|-----|------|------------
SidelineSpout | start-sideline | Counter | How many `Start Sideline` requests have been received.
SidelineSpout | stop-sideline | Counter | How many `Stop Sideline` requests have been received.
<!-- DYNAMIC_SPOUT_METRICS_BEGIN_DELIMITER -->
### Dynamic Spout Metrics
Key | Type | Unit | Description |
--- | ---- | ---- | ----------- |
SpoutCoordinator.bufferSize | GAUGE | Number | Size of internal MessageBuffer. |
SpoutCoordinator.completed | GAUGE | Number | The number of completed VirtualSpout instances. |
SpoutCoordinator.errored | GAUGE | Number | The number of errored VirtualSpout instances. |
SpoutCoordinator.poolSize | GAUGE | Number | The max number of VirtualSpout instances that will be run concurrently. |
SpoutCoordinator.queued | GAUGE | Number | The number of queued VirtualSpout instances. |
SpoutCoordinator.running | GAUGE | Number | The number of running VirtualSpout instances. |
VirtualSpout.{virtualSpoutIdentifier}.ack | COUNTER | Number | Tuple ack count per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.emit | COUNTER | Number | Tuple emit count per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.exceededRetryLimit | COUNTER | Number | Messages who have exceeded the maximum configured retry count per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.fail | COUNTER | Number | Tuple fail count per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.filtered | COUNTER | Number | Filtered messages per VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.numberFiltersApplied | GAUGE | Number | How many Filters are being applied against the VirtualSpout instance. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.currentOffset | GAUGE | Number | The offset currently being processed for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.endingOffset | GAUGE | Number | The ending offset for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.percentComplete | GAUGE | Number | Percentage of messages processed out of the total for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.startingOffset | GAUGE | Percent 0.0 to 1.0 | The starting offset position for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.totalMessages | GAUGE | Number | Total number of messages to be processed by the VirtualSpout for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.totalProcessed | GAUGE | Number | Number of messages processed by the VirtualSpout instance for the given partition. |
VirtualSpout.{virtualSpoutIdentifier}.partition.{partition}.totalUnprocessed | GAUGE | Number | Number of messages remaining to be processed by the VirtualSpout instance for the given partition. |

<!-- DYNAMIC_SPOUT_METRICS_END_DELIMITER -->

<!-- KAFKA_CONSUMER_METRICS_BEGIN_DELIMITER -->
### Kafka Metrics
Key | Type | Unit | Description |
--- | ---- | ---- | ----------- |
KafkaConsumer.topic.{topic}.partition.{partition}.currentOffset | GAUGE | Number | Offset consumer has processed. |
KafkaConsumer.topic.{topic}.partition.{partition}.endOffset | GAUGE | Number | Offset for TAIL position in the partition. |
KafkaConsumer.topic.{topic}.partition.{partition}.lag | GAUGE | Number | Difference between endOffset and currentOffset metrics. |

<!-- KAFKA_CONSUMER_METRICS_END_DELIMITER -->

<!-- SIDELINE_METRICS_BEGIN_DELIMITER -->
### Sideline Metrics
Key | Type | Unit | Description |
--- | ---- | ---- | ----------- |
SidelineSpoutHandler.start | COUNTER | Number | Total number of started sidelines. |
SidelineSpoutHandler.stop | COUNTER | Number | Total number of stopped sidelines. |

<!-- SIDELINE_METRICS_END_DELIMITER -->

# Sidelining

Expand Down
47 changes: 46 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<licenses>
<license>
<name>BSD-3</name>
<url>https://github.com/salesforce/storm-dynamic-spout/blob/master/LICENSE.txt</url>
<url>https://raw.githubusercontent.com/salesforce/storm-dynamic-spout/master/LICENSE.txt</url>
</license>
</licenses>

Expand Down Expand Up @@ -416,6 +416,51 @@
<autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration>
</plugin>

<!-- Generate README documentation -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<!-- Generate DynamicSpout docs -->
<execution>
<id>generate-dynamicspout-docs</id>
<phase>site</phase>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>com.salesforce.storm.spout.dynamic.config.DocTask</mainClass>
</configuration>
</execution>

<!-- Generate KafkaConsumer docs -->
<execution>
<id>generate-kafkaconsumer-docs</id>
<phase>site</phase>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>com.salesforce.storm.spout.dynamic.kafka.DocTask</mainClass>
</configuration>
</execution>

<!-- Generate Sideline Docs -->
<execution>
<id>generate-sideline-docs</id>
<phase>site</phase>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>com.salesforce.storm.spout.sideline.config.DocTask</mainClass>
</configuration>
</execution>
</executions>

</plugin>
</plugins>

<pluginManagement>
Expand Down
18 changes: 18 additions & 0 deletions script/generateDocs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

## Generally speaking, you should instead run `mvn clean site` to generate the documentation.

## Fail if any command fails
set -e

## Build package
mvn package -DskipTests=true

## Build DynamicSpout docs
java -cp target/*jar-with-dependencies.jar com.salesforce.storm.spout.dynamic.config.DocTask

## Build KafkaConsumer docs
java -cp target/*jar-with-dependencies.jar com.salesforce.storm.spout.dynamic.kafka.DocTask

## Build Sideline docs
java -cp target/*jar-with-dependencies.jar com.salesforce.storm.spout.sideline.config.DocTask
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
* following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this list of conditions and the following
* disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
*
* * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.storm.spout.documentation;

import java.util.HashMap;
import java.util.Map;

/**
* Define a Class instance to generate documentation from.
*/
public class ClassSpec {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary to me. Why not just force the classes that we want to generate docs for to use an interface that includes a setDefaults/getDefaults method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just trying to change as little as possible for a first pass. I think we need to think about how/what things like SpoutConfig actually look like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^-- as part of a larger work item review those and come up with a plan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I dig ya, I just feel like adding an interface that's like DefaultableConfig or something would be cleaner on the implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think we need to come up with concrete config classes instead of maps, and add that as part of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to create a follow up issue to create concrete config classes for DynamicSpout, SidelineSpout (if needed), and KafkaConsumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at adding an interface but currently we use static methods to setDefaults(). So before we could apply an interface over it, we'd first need to refactor SpoutConfig / SidelineConfig into being instantiated into an instance.

I think this is better pushed off to the specific issue/work around configs, see #80

private final Class clazz;
private final Map<String, Object> defaults;

/**
* Constructor.
* @param clazz Class to generate documentation for.
* @param defaults Default values for each option.
*/
public ClassSpec(final Class clazz, final Map<String, Object> defaults) {
this.clazz = clazz;
this.defaults = defaults;
}

public ClassSpec(final Class clazz) {
this(clazz, new HashMap<>());
}

/**
* @return Configured class instance.
*/
public Class getClazz() {
return clazz;
}

public Map<String, Object> getDefaults() {
return defaults;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.storm.spout.dynamic.config.annotation;
package com.salesforce.storm.spout.documentation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
Expand All @@ -37,16 +37,18 @@
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface Documentation {
public @interface ConfigDocumentation {

/**
* Enum of the categories for the configuration setting.
*/
enum Category {
NONE(""),
KAFKA("Kafka"),
DYNAMIC_SPOUT("Dynamic Spout"),
KAFKA("Kafka Consumer"),
PERSISTENCE("Persistence"),
PERSISTENCE_ZOOKEEPER("Zookeeper Persistence");
PERSISTENCE_ZOOKEEPER("Zookeeper Persistence"),
SIDELINE("Sideline");

private final String value;

Expand Down
Loading