Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Fix existing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stanlemon committed Dec 7, 2017
1 parent 4850cb7 commit ffcfd1a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ synchronized void loadSidelines() {
continue;
}

// Resuming a start request means we apply the previous filter chain to the fire hose
if (payload.type.equals(SidelineType.PAUSE)) {
// Loading a paused or resumed request means we apply the previous filter chain to the fire hose
if (payload.type.equals(SidelineType.PAUSE) || payload.type.equals(SidelineType.RESUME)) {
logger.info("Handling {} request for sideline {} {}", payload.type, payload.id, payload.request.step);

// Only add the step if the id isn't already in the chain. Note that we do NOT check for the step here, it's possible
Expand All @@ -293,8 +293,8 @@ synchronized void loadSidelines() {
}
}

// Resuming a stopped request means we spin up a new sideline spout
if (payload.type.equals(SidelineType.COMPLETE)) {
// Loading a resumed or completed request means we spin up a new sideline spout
if (payload.type.equals(SidelineType.RESUME) || payload.type.equals(SidelineType.COMPLETE)) {
logger.info("Handling {} request for sideline {} {}", payload.type, payload.id, payload.request.step);

// This method will check to see that the VirtualSpout isn't already in the SpoutCoordinator before adding it
Expand Down Expand Up @@ -489,6 +489,15 @@ public void complete(SidelineRequest sidelineRequest) {

final ConsumerState endingState = fireHoseSpout.getCurrentState();

// Identifier based off of the id of the sideline request
final VirtualSpoutIdentifier virtualSpoutIdentifier = generateSidelineVirtualSpoutId(id);

// If we can't find this VirtualSpout we should bail and not proceed any further.
if (!spout.hasVirtualSpout(virtualSpoutIdentifier)) {
logger.error("SidelineRequest to complete {} was made, but the correspond VirtualSpout does not exist.", sidelineRequest.id);
return;
}

// Once the filter chain has been removed from the firehose, mark our sideline with an ending offset
spout.getVirtualSpout(generateSidelineVirtualSpoutId(id)).setEndingState(endingState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,6 @@ public void doTestWithSidelining() throws InterruptedException {
// This means that our starting offset for the sideline'd data should start at offset 3 (we acked offsets 0, 1, 2)
StaticTrigger.sendPauseRequest(request);

// TODO: Validate something in here
StaticTrigger.sendResumeRequest(request);

// Produce another 3 records into kafka.
producedRecords = produceRecords(numberOfRecordsToPublish, 0);

Expand All @@ -187,6 +184,9 @@ public void doTestWithSidelining() throws InterruptedException {
// all tuples are filtered.
validateNextTupleEmitsNothing(spout, spoutOutputCollector, 10, 3000L);

// TODO: Validate something in here
StaticTrigger.sendResumeRequest(request);

// Send a stop sideline request
StaticTrigger.sendCompleteRequest(request);

Expand Down Expand Up @@ -294,11 +294,6 @@ public void testResumingSpoutWhileSidelinedVirtualSpoutIsActive() throws Interru
// This means that our starting offset for the sideline'd data should start at offset 3 (we acked offsets 0, 1, 2)
StaticTrigger.sendPauseRequest(request);

// TODO: Validate something in here
StaticTrigger.sendResumeRequest(request);

final SidelineRequestIdentifier sidelineRequestIdentifier = request.id;

// Produce 5 more messages into kafka, should be offsets [10,11,12,13,14]
final List<ProducedKafkaRecord<byte[], byte[]>> additionalProducedRecords = produceRecords(5, 0);

Expand Down Expand Up @@ -339,6 +334,9 @@ public void testResumingSpoutWhileSidelinedVirtualSpoutIsActive() throws Interru
// offsets [4,5,6,7,8,9,10,11,12,13,14] <- last committed offset now 14 on firehose.
validateNextTupleEmitsNothing(spout, spoutOutputCollector, 20, 100L);

// TODO: Validate something in here
StaticTrigger.sendResumeRequest(request);

// Send a stop sideline request
StaticTrigger.sendCompleteRequest(request);
final SidelineVirtualSpoutIdentifier sidelineIdentifier = new SidelineVirtualSpoutIdentifier(consumerIdPrefix, request.id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.salesforce.storm.spout.dynamic.DynamicSpout;
import com.salesforce.storm.spout.dynamic.FactoryManager;
import com.salesforce.storm.spout.dynamic.VirtualSpoutFactory;
import com.salesforce.storm.spout.dynamic.filter.FilterChainStep;
import com.salesforce.storm.spout.dynamic.metrics.LogRecorder;
import com.salesforce.storm.spout.sideline.SidelineVirtualSpoutIdentifier;
import com.salesforce.storm.spout.dynamic.VirtualSpout;
Expand Down Expand Up @@ -277,7 +278,7 @@ public void testStopSidelining() {
final String namespace = MockConsumer.topic;

final SidelineRequestIdentifier stopRequestId = new SidelineRequestIdentifier("StopRequest");
final StaticMessageFilter stopFilter = new StaticMessageFilter();
final FilterChainStep stopFilter = new NegatingFilterChainStep(new StaticMessageFilter());
final SidelineRequest stopRequest = new SidelineRequest(stopRequestId, stopFilter);

final DynamicSpout spout = new DynamicSpout(config);
Expand All @@ -290,29 +291,30 @@ public void testStopSidelining() {

final PersistenceAdapter persistenceAdapter = sidelineSpoutHandler.getPersistenceAdapter();

sidelineSpoutHandler.onSpoutOpen(spout, new HashMap(), new MockTopologyContext());

// Persist our stop request as a start, so that when we stop it, it can be found
// Note that we are doing this AFTER the spout has opened because we are NOT testing the resume logic
persistenceAdapter.persistSidelineRequestState(
SidelineType.PAUSE,
SidelineType.RESUME,
stopRequestId,
stopRequest,
new ConsumerPartition(namespace, 0),
1L, // starting offset
null // ending offset
);
persistenceAdapter.persistSidelineRequestState(
SidelineType.PAUSE,
SidelineType.RESUME,
stopRequestId,
stopRequest,
new ConsumerPartition(namespace, 5),
3L, // starting offset
null // ending offset
);

sidelineSpoutHandler.onSpoutOpen(spout, new HashMap(), new MockTopologyContext());


// Stick the filter onto the fire hose, it should be removed when we stop the sideline request
sidelineSpoutHandler.getFireHoseSpout().getFilterChain().addStep(stopRequestId, stopFilter);
//sidelineSpoutHandler.getFireHoseSpout().getFilterChain().addStep(stopRequestId, stopFilter);

// Tell our mock consumer that these are the partitions we're working with
MockConsumer.partitions = Arrays.asList(0, 5);
Expand All @@ -335,15 +337,15 @@ public void testStopSidelining() {

assertEquals(SidelineType.COMPLETE, partition0.type);
assertEquals(stopRequestId, partition0.id);
assertEquals(new NegatingFilterChainStep(stopRequest.step), partition0.request.step);
assertEquals(stopRequest.step, partition0.request.step);
assertEquals(Long.valueOf(1L), partition0.startingOffset);
assertEquals(Long.valueOf(1L), partition0.endingOffset);

final SidelinePayload partition5 = persistenceAdapter.retrieveSidelineRequest(stopRequestId, new ConsumerPartition(namespace, 5));

assertEquals(SidelineType.COMPLETE, partition5.type);
assertEquals(stopRequestId, partition5.id);
assertEquals(new NegatingFilterChainStep(stopRequest.step), partition5.request.step);
assertEquals(stopRequest.step, partition5.request.step);
assertEquals(Long.valueOf(3L), partition5.startingOffset);
assertEquals(Long.valueOf(1L), partition5.endingOffset);

Expand Down

0 comments on commit ffcfd1a

Please sign in to comment.