-
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add windowing and aggregation #18
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some early comments
py/sentry_streams/pipeline.py
Outdated
@dataclass | ||
class Reduce(WithInput): | ||
# group_by_key: refactor to Callable reference | ||
group_by_key: Callable[..., Any] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the group by key may not be mandatory. Let's say you want to simulate something like
SELECT count(*) from table
Basically accumulate data in a counter or any other data sketch without grouping in any way.
py/sentry_streams/pipeline.py
Outdated
class Reduce(WithInput): | ||
# group_by_key: refactor to Callable reference | ||
group_by_key: Callable[..., Any] | ||
# windowing mechanism, is this going to be mandatory? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily.
Windowing is needed to produce intermediate results on an unbounded stream. So every time this step emits results we need windowing. But there are stateful primitives that do not emit results. Think about the counter in postgres. We just count and periodically store in postgres.
This about how you would model this https://www.notion.so/sentry/Data-workers-1808b10e4b5d806983f3f0e22a2c2758?pvs=4#1858b10e4b5d80c98207e29786a1317a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline, we'll support that use case (for now) using windowed aggregation + sink.
So windowing will be required in this iteration of the API
py/sentry_streams/pipeline.py
Outdated
# group_by_key: refactor to Callable reference | ||
group_by_key: Callable[..., Any] | ||
# windowing mechanism, is this going to be mandatory? | ||
# windowing: Window |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are missing a watermark content. Basically the windowing algorithm decides the window semantics, but the user has to specify how/when to close a window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added by allowing users to configure windows and triggers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another point. Windows that close by event time are inherently tied to the watermarking strategy.
This strategy can be set directly on the source especially if we're using something like Kafka. We can also allow users to configure their own as part of a step like Reduce, though I believe that's not recommended because that would mean a new watermarked stream is being produced as part of the step (at least for Flink).
WDYT about how we should configure watermarking strategy? I feel that it makes sense to set it up directly on the source.
py/sentry_streams/pipeline.py
Outdated
step_type: StepType = StepType.REDUCE | ||
# storage: a fixed (enum?) set of storage backends we provide | ||
# consider making this a class | ||
storage: StateBackend = StateBackend.HASH_MAP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the storage hidden by the accumulator at times ?
Think about these cases:
- Aggregation with windowing and producing the output. Here we can separate aggregaiton function from store as we can store the aggregation state where we want.
- Global state: accumulating counts in postgres. Here the storaeg is application specific so there is no point in separating the two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the same discussion we had related to #18 (comment)
I can leave a TODO to come back to this in a next PR since this once is getting large
@abstractmethod | ||
def add(self, acc: Any, value: Any) -> Any: | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def get_output(self, acc: Any) -> Any: | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def merge(self, acc1: Any, acc2: Any) -> Any: | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hint. Try to make this generic rather than have Any
as type everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do, right now just getting things working with Flink
from typing import Any | ||
|
||
|
||
class Accumulator(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a sense on which accumulators we will provide out of the box ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. I think that might be a good idea for next PR. Leaving a TODO
Still todo for this PR:
|
return flink_time | ||
|
||
|
||
class FlinkWindows: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this translation needs some work.
Right now, our API exposes only a few specific window, trigger combinations that a user can pick from. As a result, the translation is effectively hardcoded, and we get to just use some of the WindowAssigners
that Flink offers out of the box. Down the road though, this translation will need to be more flexible, especially if we want highly customizable windowing.
Flink exposes a WindowAssigner
interface. One way of doing this translation is by implementing the WindowAssigner
interface under the hood. This is a possibility, will probably require a little more understanding of Flink's internals.
Open to any other ideas as well.
This PR:
AggregateFunction
.It also:
Further clean-ups are necessary, these are written in a comment at the end of the PR. Right now, only the minimal, high-level pieces are in place.
Testing (Word Counter)
You can run the pipeline in
example_config.py
in the Flink container dev env. Follow the instructions here: https://github.com/getsentry/streams?tab=readme-ov-file#streams. Make sure you create both input and output topics on your Kafka broker.Send data like below example
You should see words and their counts grouped together, based on the windowing strategy.