From 4850cb7502d49c38b754dff635d47920527374d8 Mon Sep 17 00:00:00 2001 From: Stan Lemon Date: Tue, 5 Dec 2017 17:37:50 -0500 Subject: [PATCH] Stubbing out embedding resumes --- .../storm/spout/sideline/SidelineSpoutTest.java | 16 ++++++++++------ .../spout/sideline/trigger/StaticTrigger.java | 16 ++++++++++++---- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/salesforce/storm/spout/sideline/SidelineSpoutTest.java b/src/test/java/com/salesforce/storm/spout/sideline/SidelineSpoutTest.java index 4d10a261..3d8f1e7b 100644 --- a/src/test/java/com/salesforce/storm/spout/sideline/SidelineSpoutTest.java +++ b/src/test/java/com/salesforce/storm/spout/sideline/SidelineSpoutTest.java @@ -66,8 +66,6 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.time.Clock; import java.util.Collections; import java.util.Iterator; @@ -176,7 +174,10 @@ public void doTestWithSidelining() throws InterruptedException { // Send a new start request with our filter. // This means that our starting offset for the sideline'd data should start at offset 3 (we acked offsets 0, 1, 2) - StaticTrigger.sendStartRequest(request); + StaticTrigger.sendPauseRequest(request); + + // TODO: Validate something in here + StaticTrigger.sendResumeRequest(request); // Produce another 3 records into kafka. producedRecords = produceRecords(numberOfRecordsToPublish, 0); @@ -187,7 +188,7 @@ public void doTestWithSidelining() throws InterruptedException { validateNextTupleEmitsNothing(spout, spoutOutputCollector, 10, 3000L); // Send a stop sideline request - StaticTrigger.sendStopRequest(request); + StaticTrigger.sendCompleteRequest(request); // Wait for the sideline vspout to start, waitForVirtualSpouts(spout, 2); @@ -291,7 +292,10 @@ public void testResumingSpoutWhileSidelinedVirtualSpoutIsActive() throws Interru // Send a new start request with our filter. // This means that our starting offset for the sideline'd data should start at offset 3 (we acked offsets 0, 1, 2) - StaticTrigger.sendStartRequest(request); + StaticTrigger.sendPauseRequest(request); + + // TODO: Validate something in here + StaticTrigger.sendResumeRequest(request); final SidelineRequestIdentifier sidelineRequestIdentifier = request.id; @@ -336,7 +340,7 @@ public void testResumingSpoutWhileSidelinedVirtualSpoutIsActive() throws Interru validateNextTupleEmitsNothing(spout, spoutOutputCollector, 20, 100L); // Send a stop sideline request - StaticTrigger.sendStopRequest(request); + StaticTrigger.sendCompleteRequest(request); final SidelineVirtualSpoutIdentifier sidelineIdentifier = new SidelineVirtualSpoutIdentifier(consumerIdPrefix, request.id); // Verify 2 VirtualSpouts are running diff --git a/src/test/java/com/salesforce/storm/spout/sideline/trigger/StaticTrigger.java b/src/test/java/com/salesforce/storm/spout/sideline/trigger/StaticTrigger.java index 6e19745e..5a933ba6 100644 --- a/src/test/java/com/salesforce/storm/spout/sideline/trigger/StaticTrigger.java +++ b/src/test/java/com/salesforce/storm/spout/sideline/trigger/StaticTrigger.java @@ -64,18 +64,26 @@ public void setSidelineController(SidelineController sidelineController) { } /** - * Start a sideline request. + * Send a pause sideline request. * @param request sideline request. */ - public static void sendStartRequest(final SidelineRequest request) { + public static void sendPauseRequest(final SidelineRequest request) { StaticTrigger.sidelineController.pause(request); } /** - * Stop a sideline request. + * Send a resume sideline request. * @param request sideline request. */ - public static void sendStopRequest(final SidelineRequest request) { + public static void sendResumeRequest(final SidelineRequest request) { + StaticTrigger.sidelineController.resume(request); + } + + /** + * Send a complete sideline request. + * @param request sideline request. + */ + public static void sendCompleteRequest(final SidelineRequest request) { StaticTrigger.sidelineController.complete(request); } }