Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline Runner Implementation and Tests #5435

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

MohammedAghil
Copy link
Contributor

Description

At Present, the functionalities of running a pipeline is encapsulated inside Process Worker. While this provides encapsulation, it does not provide a way for a given thread such as in the case of #5416 to execute processors and publish to sinks in a synchronous way without having an instance of Process Worker itself which contains additional responsibilities such as shutting down process for pipeline.

Implemented a concrete class for Pipeline Runner by moving and refactoring the functionalities of reading from buffer, executing processors and publishing to sinks from Process Worker to it, also added additional unit tests for coverage.

Issues Resolved

Resolves #5429

Check List

  • New functionality includes testing.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

}
try {
pipelineRunner.runAllProcessorsAndPublishToSinks();
} catch (InvalidEventHandleException exception) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not catch and handle this in PipelineRunner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion @dlvenable , I have moved this into PipelineRunner

import java.util.stream.IntStream;

@ExtendWith(MockitoExtension.class)
class PipelineRunnerTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you copy and paste these tests? Are any new or modified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some new tests and also moved some tests from ProcessWorker to PipelineRunnerTest as the functionality moved.

@@ -135,6 +135,7 @@ void setup() {
when(pipeline.isStopRequested()).thenReturn(false).thenReturn(true);
when(source.areAcknowledgementsEnabled()).thenReturn(true);
when(pipeline.getSource()).thenReturn(source);
when(pipeline.getBuffer()).thenReturn(buffer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you need to include this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like we discussed, The AggregateProcessorITWithAcks creates an instance of the ProcessWorker which makes it complicated to test with PipelineRunner

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Pipeline Runner
2 participants