Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Add support for resume state in sidelining #79

Merged
merged 5 commits into from
Jan 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public interface DelegateSpout {
ConsumerState getEndingState();

/**
* Set the ending state of the {@link DelegateSpout}.for when it should be marked as complete.
* Set the ending state of the {@link DelegateSpout}.for when it should finish consuming.
*
* @param endingState ending consumer state for when the {@link DelegateSpout} should be marked as complete.
* @param endingState ending consumer state for when the {@link DelegateSpout} should finish consuming.
*/
void setEndingState(ConsumerState endingState);

Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/salesforce/storm/spout/dynamic/DynamicSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,20 @@ public boolean hasVirtualSpout(final VirtualSpoutIdentifier spoutIdentifier) {
return getSpoutCoordinator().hasVirtualSpout(spoutIdentifier);
}

/**
* Get a {@link DelegateSpout} instance from the {@link SpoutCoordinator}.
*
* This is useful is you want to manipulate the filter chain or alter the ending state after a {@link DelegateSpout} has
* been added to the {@link SpoutCoordinator}.
*
* @param virtualSpoutIdentifier identifier for teh {@link DelegateSpout} instance to get from the {@link SpoutCoordinator}.
* @return {@link DelegateSpout} instance
*/
public DelegateSpout getVirtualSpout(final VirtualSpoutIdentifier virtualSpoutIdentifier) throws SpoutDoesNotExistException {
checkSpoutOpened();
return getSpoutCoordinator().getVirtualSpout(virtualSpoutIdentifier);
}

/**
* Get the total number of virtual spouts in the coordinator.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ public ConsumerState getEndingState() {
}

/**
* Set the ending state of the {@link DelegateSpout}.for when it should be marked as complete.
* Set the ending state of the {@link DelegateSpout} for when it should finish consuming.
*
* @param endingState ending consumer state for when the {@link DelegateSpout} should be marked as complete.
* @param endingState ending consumer state for when the {@link DelegateSpout} should finish consuming.
*/
@Override
public void setEndingState(final ConsumerState endingState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,26 @@ public synchronized boolean hasVirtualSpout(final VirtualSpoutIdentifier virtual
return runningSpouts.containsKey(virtualSpoutIdentifier);
}

/**
* Get a {@link DelegateSpout} instance from the {@link SpoutCoordinator}.
*
* This is useful is you want to manipulate the filter chain or alter the ending state after a {@link DelegateSpout} has
* been added to the {@link SpoutCoordinator}.
*
* @param virtualSpoutIdentifier identifier for teh {@link DelegateSpout} instance to get from the {@link SpoutCoordinator}.
* @return {@link DelegateSpout} instance
*/
public synchronized DelegateSpout getVirtualSpout(final VirtualSpoutIdentifier virtualSpoutIdentifier) {
if (!hasVirtualSpout(virtualSpoutIdentifier)) {
throw new SpoutDoesNotExistException(
"VirtualSpout " + virtualSpoutIdentifier + " does not exist in the SpoutCoordinator.",
virtualSpoutIdentifier
);
}

return runningSpouts.get(virtualSpoutIdentifier).getSpoutRunner().getSpout();
}

/**
* Signals to a VirtualSpout to stop, ultimately removing it from the monitor.
* This call will block waiting for the VirtualSpout instance to shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
package com.salesforce.storm.spout.sideline.handler;

import com.salesforce.storm.spout.sideline.trigger.SidelineRequest;
import com.salesforce.storm.spout.sideline.trigger.SidelineRequestIdentifier;

/**
* A proxy to create a layer of indirection between the SpoutHandler and the Triggers. This allows us to refactor where
Expand All @@ -36,28 +35,61 @@
public interface SidelineController {

/**
* Does a sideline exist in the started state?
* Is a sideline in the start state?
*
* @param sidelineRequest sideline request.
* @return true it does, false it does not.
* @return true it is in this state, false if it is not.
*/
boolean isSidelineStarted(SidelineRequest sidelineRequest);
boolean isStarted(SidelineRequest sidelineRequest);

/**
* Start sidelining.
* @param request Sideline request, container an id and a filter chain step.
*
* Starting a sideline applies a {@link com.salesforce.storm.spout.dynamic.filter.FilterChainStep} to the firehose
* such that messages are filtered. When this happens the current offset of the firehose is captured and persisted,
* as it will be used later when the sideline goes to {@link #resume(SidelineRequest)}.
*
* @param sidelineRequest sideline request.
*/
void start(final SidelineRequest sidelineRequest);

/**
* Is a sideline in the resume state?
*
* @param sidelineRequest sideline request.
* @return true it is in this state, false if it is not.
*/
boolean isResumed(SidelineRequest sidelineRequest);

/**
* Resume sidelining.
*
* Resuming a sideline create a new {@link com.salesforce.storm.spout.dynamic.VirtualSpout} instance that will
* using the offset captured in {@link #start(SidelineRequest)} as its starting point. While a sideline is in
* this state the {@link com.salesforce.storm.spout.dynamic.filter.FilterChainStep} will remain on the firehose.
*
* @param sidelineRequest Sideline request, container an id and a filter chain step.
*/
void startSidelining(final SidelineRequest request);
void resume(final SidelineRequest sidelineRequest);

/**
* Does a sideline exist in the stopped state?
* Is a sideline is the resolve state?
*
* @param sidelineRequest sideline request.
* @return true it has, false it has not.
* @return true it is in this state, false if it is not.
*/
boolean isSidelineStopped(SidelineRequest sidelineRequest);
boolean isResolving(SidelineRequest sidelineRequest);

/**
* Stop sidelining.
* @param request Sideline request, container an id and a filter chain step.
* Resolve a sideline.
*
* The {@link com.salesforce.storm.spout.dynamic.VirtualSpout} processing the historical data from the sideline window is
* is given an ending offset, and the {@link com.salesforce.storm.spout.dynamic.filter.FilterChainStep} is removed
* from the firehose {@link com.salesforce.storm.spout.dynamic.VirtualSpout}.
*
* The verbiage here is that we are 'resolving' the sideline, meaning that we are moving back to processing on the firehose.
*
* @param sidelineRequest Sideline request, container an id and a filter chain step.
*/
void stopSidelining(final SidelineRequest request);
void resolve(final SidelineRequest sidelineRequest);
}
Loading