Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Stubbing out embedding resumes
Browse files Browse the repository at this point in the history
  • Loading branch information
stanlemon committed Dec 7, 2017
1 parent fc74cfe commit 4850cb7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -176,7 +174,10 @@ public void doTestWithSidelining() throws InterruptedException {

// Send a new start request with our filter.
// This means that our starting offset for the sideline'd data should start at offset 3 (we acked offsets 0, 1, 2)
StaticTrigger.sendStartRequest(request);
StaticTrigger.sendPauseRequest(request);

// TODO: Validate something in here
StaticTrigger.sendResumeRequest(request);

// Produce another 3 records into kafka.
producedRecords = produceRecords(numberOfRecordsToPublish, 0);
Expand All @@ -187,7 +188,7 @@ public void doTestWithSidelining() throws InterruptedException {
validateNextTupleEmitsNothing(spout, spoutOutputCollector, 10, 3000L);

// Send a stop sideline request
StaticTrigger.sendStopRequest(request);
StaticTrigger.sendCompleteRequest(request);

// Wait for the sideline vspout to start,
waitForVirtualSpouts(spout, 2);
Expand Down Expand Up @@ -291,7 +292,10 @@ public void testResumingSpoutWhileSidelinedVirtualSpoutIsActive() throws Interru

// Send a new start request with our filter.
// This means that our starting offset for the sideline'd data should start at offset 3 (we acked offsets 0, 1, 2)
StaticTrigger.sendStartRequest(request);
StaticTrigger.sendPauseRequest(request);

// TODO: Validate something in here
StaticTrigger.sendResumeRequest(request);

final SidelineRequestIdentifier sidelineRequestIdentifier = request.id;

Expand Down Expand Up @@ -336,7 +340,7 @@ public void testResumingSpoutWhileSidelinedVirtualSpoutIsActive() throws Interru
validateNextTupleEmitsNothing(spout, spoutOutputCollector, 20, 100L);

// Send a stop sideline request
StaticTrigger.sendStopRequest(request);
StaticTrigger.sendCompleteRequest(request);
final SidelineVirtualSpoutIdentifier sidelineIdentifier = new SidelineVirtualSpoutIdentifier(consumerIdPrefix, request.id);

// Verify 2 VirtualSpouts are running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,26 @@ public void setSidelineController(SidelineController sidelineController) {
}

/**
* Start a sideline request.
* Send a pause sideline request.
* @param request sideline request.
*/
public static void sendStartRequest(final SidelineRequest request) {
public static void sendPauseRequest(final SidelineRequest request) {
StaticTrigger.sidelineController.pause(request);
}

/**
* Stop a sideline request.
* Send a resume sideline request.
* @param request sideline request.
*/
public static void sendStopRequest(final SidelineRequest request) {
public static void sendResumeRequest(final SidelineRequest request) {
StaticTrigger.sidelineController.resume(request);
}

/**
* Send a complete sideline request.
* @param request sideline request.
*/
public static void sendCompleteRequest(final SidelineRequest request) {
StaticTrigger.sidelineController.complete(request);
}
}

0 comments on commit 4850cb7

Please sign in to comment.