Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add windowing and aggregation #18

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft

WIP: Add windowing and aggregation #18

wants to merge 10 commits into from

Conversation

ayirr7
Copy link
Member

@ayirr7 ayirr7 commented Feb 20, 2025

This PR:

  • Adds, at a very high level, Reduce primitive (maybe should be renamed to Aggregate)
  • Establishes some user-defined functions which get translated to become runtime-specific. For example, aggregation is represented as a general Accumulator. This gets translated into Flink's AggregateFunction.
  • Adds windowing and triggers. These are inspired by how Flink creates windows, assigns data in a stream to windows, and triggers functions/computations on those windows. The PR as-is only supports a few types of windows with either a count-based or event time-based trigger.

It also:

  • Supports aggregation with an optional group by
  • Assumes that reduce/aggregation relies on windows being created on the stream (i.e. windowing is mandatory)
  • Does not yet support or consider the case of embedding storage as part of the aggregation (as described here). This may need to be tackled in a separate PR.
  • There are plenty more window and trigger combinations that can be supported in later PRs. We could also create a template for writing custom ones in another PR.

Further clean-ups are necessary, these are written in a comment at the end of the PR. Right now, only the minimal, high-level pieces are in place.

Testing (Word Counter)

You can run the pipeline in example_config.py in the Flink container dev env. Follow the instructions here: https://github.com/getsentry/streams?tab=readme-ov-file#streams. Make sure you create both input and output topics on your Kafka broker.

Send data like below example

riyachakraborty@H472HKN6XK py % kcat -P -b 127.0.0.1:9092 -t events                                                             
{"word": "hi1"}
{"word": "hi2"}
{"word": "hi2"}
{"word": "hi1"}
{"word": "hi2"}
{"word": "hi1"}

You should see words and their counts grouped together, based on the windowing strategy.

riyachakraborty@H472HKN6XK py % kcat -C -b 127.0.0.1:9092 -t transformed-events                                                 
hi2 2
hi1 2

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some early comments

@dataclass
class Reduce(WithInput):
# group_by_key: refactor to Callable reference
group_by_key: Callable[..., Any]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the group by key may not be mandatory. Let's say you want to simulate something like

SELECT count(*) from table

Basically accumulate data in a counter or any other data sketch without grouping in any way.

class Reduce(WithInput):
# group_by_key: refactor to Callable reference
group_by_key: Callable[..., Any]
# windowing mechanism, is this going to be mandatory?
Copy link
Collaborator

@fpacifici fpacifici Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily.
Windowing is needed to produce intermediate results on an unbounded stream. So every time this step emits results we need windowing. But there are stateful primitives that do not emit results. Think about the counter in postgres. We just count and periodically store in postgres.
This about how you would model this https://www.notion.so/sentry/Data-workers-1808b10e4b5d806983f3f0e22a2c2758?pvs=4#1858b10e4b5d80c98207e29786a1317a

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, we'll support that use case (for now) using windowed aggregation + sink.

So windowing will be required in this iteration of the API

# group_by_key: refactor to Callable reference
group_by_key: Callable[..., Any]
# windowing mechanism, is this going to be mandatory?
# windowing: Window
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing a watermark content. Basically the windowing algorithm decides the window semantics, but the user has to specify how/when to close a window.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added by allowing users to configure windows and triggers

Copy link
Member Author

@ayirr7 ayirr7 Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point. Windows that close by event time are inherently tied to the watermarking strategy.

This strategy can be set directly on the source especially if we're using something like Kafka. We can also allow users to configure their own as part of a step like Reduce, though I believe that's not recommended because that would mean a new watermarked stream is being produced as part of the step (at least for Flink).

WDYT about how we should configure watermarking strategy? I feel that it makes sense to set it up directly on the source.

step_type: StepType = StepType.REDUCE
# storage: a fixed (enum?) set of storage backends we provide
# consider making this a class
storage: StateBackend = StateBackend.HASH_MAP
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the storage hidden by the accumulator at times ?
Think about these cases:

  • Aggregation with windowing and producing the output. Here we can separate aggregaiton function from store as we can store the aggregation state where we want.
  • Global state: accumulating counts in postgres. Here the storaeg is application specific so there is no point in separating the two.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same discussion we had related to #18 (comment)

I can leave a TODO to come back to this in a next PR since this once is getting large

Comment on lines 11 to 21
@abstractmethod
def add(self, acc: Any, value: Any) -> Any:
raise NotImplementedError

@abstractmethod
def get_output(self, acc: Any) -> Any:
raise NotImplementedError

@abstractmethod
def merge(self, acc1: Any, acc2: Any) -> Any:
raise NotImplementedError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hint. Try to make this generic rather than have Any as type everywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, right now just getting things working with Flink

from typing import Any


class Accumulator(ABC):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a sense on which accumulators we will provide out of the box ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I think that might be a good idea for next PR. Leaving a TODO

@ayirr7 ayirr7 requested a review from fpacifici February 21, 2025 20:09
@ayirr7
Copy link
Member Author

ayirr7 commented Feb 21, 2025

Still todo for this PR:

  • Address typing comment above (move to Generics)
  • Add tests
  • Edit README for Flink container setup (create the transformed-events topic, etc)
  • More docstrings to clarify semantics

@ayirr7 ayirr7 changed the title WIP; windowing, aggregation WIP: Add windowing and aggregation Feb 21, 2025
return flink_time


class FlinkWindows:
Copy link
Member Author

@ayirr7 ayirr7 Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this translation needs some work.

Right now, our API exposes only a few specific window, trigger combinations that a user can pick from. As a result, the translation is effectively hardcoded, and we get to just use some of the WindowAssigners that Flink offers out of the box. Down the road though, this translation will need to be more flexible, especially if we want highly customizable windowing.

Flink exposes a WindowAssigner interface. One way of doing this translation is by implementing the WindowAssigner interface under the hood. This is a possibility, will probably require a little more understanding of Flink's internals.

Open to any other ideas as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants