-
Notifications
You must be signed in to change notification settings - Fork 13
Add support for resume state in sidelining #79
Conversation
void resume(final SidelineRequest sidelineRequest); | ||
|
||
/** | ||
* Does a sideline exist in the finished state? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with peek, we should figure out a better name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m open to suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bounded Completed Something Or oTher.
I have no good suggestions :/
ad42c39
to
4850cb7
Compare
ffcfd1a
to
9225402
Compare
@@ -437,6 +437,11 @@ public boolean hasVirtualSpout(final VirtualSpoutIdentifier spoutIdentifier) { | |||
return getSpoutCoordinator().hasVirtualSpout(spoutIdentifier); | |||
} | |||
|
|||
public DelegateSpout getVirtualSpout(final VirtualSpoutIdentifier virtualSpoutIdentifier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jdocs?
@@ -638,7 +638,7 @@ private void attemptToComplete() { | |||
|
|||
// If we made it all the way through the above loop, we completed! | |||
// Lets flip our flag to true. | |||
logger.info("Looks like all partitions are complete! Lets wrap this up."); | |||
logger.info("Looks like all partitions are resolve! Lets wrap this up."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(wat) refactor did a string replacement on this one?
@@ -208,6 +208,10 @@ public synchronized boolean hasVirtualSpout(final VirtualSpoutIdentifier virtual | |||
return runningSpouts.containsKey(virtualSpoutIdentifier); | |||
} | |||
|
|||
public synchronized DelegateSpout getVirtualSpout(final VirtualSpoutIdentifier virtualSpoutIdentifier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
j docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally add throws and annotation for it.
I know you have a doc somewhere, but it may be helpful to updatethe PR description with clear definitions of all the states and what they mean. |
f646559
to
f2bd47d
Compare
d5f4873
to
a62ccb9
Compare
This PR will need a README overhaul, I'd like to work on that in a separate PR. |
* @return {@link DelegateSpout} instance | ||
*/ | ||
public synchronized DelegateSpout getVirtualSpout(final VirtualSpoutIdentifier virtualSpoutIdentifier) { | ||
return runningSpouts.get(virtualSpoutIdentifier).getSpoutRunner().getSpout(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need to handle the case where you ask for a non-existing virtual spout. Right now I'm guessing it would NPE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be saved here by the synchronized method making this a moot point, but it may be safer checking the return result from runningSpouts.get() and if null throw the exception.
*/ | ||
public void startSidelining(SidelineRequest sidelineRequest) { | ||
@Override | ||
public void start(SidelineRequest sidelineRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does there need to be any validation around, what if you attempt to START a sideline thats already in RESUME, etc.. Basically validation that this state machine can't run backwards.
public void resolve(SidelineRequest sidelineRequest) { | ||
final SidelineRequestIdentifier id = sidelineRequest.id; | ||
|
||
logger.info("Received FINISH sideline request"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment seems wrong, FiNISHED != resolve?
@@ -31,5 +31,6 @@ | |||
public enum SidelineType { | |||
|
|||
START, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add some quick descriptions here for what each of these mean
@@ -453,7 +453,7 @@ public boolean hasVirtualSpout(final VirtualSpoutIdentifier spoutIdentifier) { | |||
* @param virtualSpoutIdentifier identifier for teh {@link DelegateSpout} instance to get from the {@link SpoutCoordinator}. | |||
* @return {@link DelegateSpout} instance | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs javadoc @throws annotation
I want to add the validation mentioned by StevieP but I'd like to do it in a separate PR as part of the work on #75 as I think it'll be easier to address when that interface allows saving partitions together. |
let er rip. |
Summary
This PR adds a new state to the sideline sequence which we are calling resume.
Current States
Today sidelines are binary, they can be started or stopped. Starting a sideline means that a starting offset is captured for a future virtual spout and a filter is applied to the firehose virtual spout. This is typically done around the concept of a 'tenant', for example you might want to sideline account Foobar for a period of time. Stopping a sideline means that an ending offset is captured for a virtual spout that will be now created using the starting offset saved during the start of the sideline. That new virtual spout is immediately added to the coordinator with a filter that only emits the tenant in question and the filter is removed altogether from the firehose.
Potential Problems
The consequences of the binary approach are that ordering gets wildly thrown off and when a sideline stops we are essentially doubling a tenants emitted messages. This can be controlled with a different MessageBuffer, but the underlying problem remains: if a tenant's operations required sidelining we go from 0-60 (or 65) almost immediately without slowing down the emission of data into the main spout.
New States
Moving forward, and the goal of this PR is to split up the current stopping state into two different states:
Why does this matter?
The primary use we want to support here is throttling of traffic for a single tenant without having to know the ending point. So by using this new 'resume' state and a MessageBuffer that emits at a slower rate for sidelines, someone could slow the emission of a tenant into the underlying
DynamicSpout
, and then at the point at which they are ready to resume full processing they can set that sideline to 'resolve'.It's worth noting here that by calling resume() and resolve() back to back you should have the functional equivalent of today's stop() and that nothing is changing with the current start() process.
In the Future
Related to this, my suspicion is that this implementation will also give us better mechanics to control the timing of resolving the sideline so that some day in the future we can try to target the sideline once it's near the tail of the firehose (probably using some tolerance level), but that's not the purpose of this PR just an additional thought on why this change makes sense.