-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline Runner Implementation and Tests #5435
base: main
Are you sure you want to change the base?
Pipeline Runner Implementation and Tests #5435
Conversation
Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]>
} | ||
try { | ||
pipelineRunner.runAllProcessorsAndPublishToSinks(); | ||
} catch (InvalidEventHandleException exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not catch and handle this in PipelineRunner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion @dlvenable , I have moved this into PipelineRunner
import java.util.stream.IntStream; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
class PipelineRunnerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you copy and paste these tests? Are any new or modified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added some new tests and also moved some tests from ProcessWorker to PipelineRunnerTest as the functionality moved.
@@ -135,6 +135,7 @@ void setup() { | |||
when(pipeline.isStopRequested()).thenReturn(false).thenReturn(true); | |||
when(source.areAcknowledgementsEnabled()).thenReturn(true); | |||
when(pipeline.getSource()).thenReturn(source); | |||
when(pipeline.getBuffer()).thenReturn(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you need to include this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like we discussed, The AggregateProcessorITWithAcks creates an instance of the ProcessWorker which makes it complicated to test with PipelineRunner
…ests Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]>
Description
At Present, the functionalities of running a pipeline is encapsulated inside Process Worker. While this provides encapsulation, it does not provide a way for a given thread such as in the case of #5416 to execute processors and publish to sinks in a synchronous way without having an instance of Process Worker itself which contains additional responsibilities such as shutting down process for pipeline.
Implemented a concrete class for Pipeline Runner by moving and refactoring the functionalities of reading from buffer, executing processors and publishing to sinks from Process Worker to it, also added additional unit tests for coverage.
Issues Resolved
Resolves #5429
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.