Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add windowing and aggregation #18

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion py/sentry_streams/adapters/stream_adapter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Any, Optional, assert_never

from sentry_streams.pipeline import Map, Sink, Source, Step, StepType
from sentry_streams.pipeline import Map, Reduce, Sink, Source, Step, StepType


class StreamAdapter(ABC):
Expand All @@ -23,6 +23,10 @@ def sink(self, step: Sink, stream: Any) -> Any:
def map(self, step: Map, stream: Any) -> Any:
raise NotImplementedError

@abstractmethod
def reduce(self, step: Reduce, stream: Any) -> Any:
raise NotImplementedError


class RuntimeTranslator:
"""
Expand Down Expand Up @@ -51,5 +55,9 @@ def translate_step(self, step: Step, stream: Optional[Any] = None) -> Any:
assert isinstance(step, Map)
return self.adapter.map(step, stream)

elif step_type is StepType.REDUCE:
assert isinstance(step, Reduce)
return self.adapter.reduce(step, stream)

else:
assert_never(step_type)
17 changes: 14 additions & 3 deletions py/sentry_streams/example_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from sentry_streams.pipeline import KafkaSink, KafkaSource, Map, Pipeline
from sentry_streams.pipeline import KafkaSink, KafkaSource, Map, Pipeline, Reduce
from sentry_streams.user_functions.sample_agg import WordCounter
from sentry_streams.user_functions.sample_group_by import my_group_by
from sentry_streams.user_functions.sample_map import EventsPipelineMapFunction

# pipeline: special name
pipeline = Pipeline()
Expand All @@ -13,12 +16,20 @@
name="mymap",
ctx=pipeline,
inputs=[source],
function="sentry_streams.sample_function.EventsPipelineMapFunction.simple_map",
function=EventsPipelineMapFunction.simple_map,
)

reduce = Reduce(
name="myreduce",
ctx=pipeline,
inputs=[map],
group_by_key=my_group_by,
aggregate_fn=WordCounter(),
)

sink = KafkaSink(
name="kafkasink",
ctx=pipeline,
inputs=[map],
inputs=[reduce],
logical_topic="transformed-events",
)
50 changes: 37 additions & 13 deletions py/sentry_streams/flink/flink_adapter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any, MutableMapping
from typing import Any, Callable, MutableMapping

from pyflink.common import Types
from pyflink.common import Time, Types, WatermarkStrategy
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.time import Duration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import ( # type: ignore[attr-defined]
FlinkKafkaConsumer,
Expand All @@ -10,9 +11,11 @@
KafkaRecordSerializationSchema,
KafkaSink,
)
from pyflink.datastream.window import TumblingEventTimeWindows
from sentry_streams.adapters.stream_adapter import StreamAdapter
from sentry_streams.modules import get_module
from sentry_streams.flink.flink_agg_fn import FlinkAggregate
from sentry_streams.pipeline import Step
from sentry_streams.user_functions.agg_template import Accumulator


class FlinkAdapter(StreamAdapter):
Expand Down Expand Up @@ -67,17 +70,38 @@ def sink(self, step: Step, stream: Any) -> Any:
def map(self, step: Step, stream: Any) -> Any:

assert hasattr(step, "function")
fn_path = step.function
mod, cls, fn = fn_path.rsplit(".", 2)
imported_fn = step.function

try:
module = get_module(mod)
# TODO: Ensure output type is configurable like the schema above
return stream.map(
func=lambda msg: imported_fn(msg),
output_type=Types.TUPLE([Types.STRING(), Types.INT()]),
)

except ImportError:
raise
# receives a DataStream, returns a DataStream
# optional: group by, windowing
# required: aggregation
def reduce(self, step: Step, stream: Any) -> Any:

imported_cls = getattr(module, cls)
imported_fn = getattr(imported_cls, fn)
# group by and agg are required
# windowing is optional and inserted between those 2

# TODO: Ensure output type is configurable like the schema above
return stream.map(func=lambda msg: imported_fn(msg), output_type=Types.STRING())
assert hasattr(step, "group_by_key")
key: Callable[[tuple[str, int]], str] = step.group_by_key

assert hasattr(step, "aggregate_fn")
agg: Accumulator = step.aggregate_fn

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_idleness(
Duration.of_seconds(5)
)
time_stream = stream.assign_timestamps_and_watermarks(watermark_strategy)

keyed_stream = time_stream.key_by(key)
windowed_stream = keyed_stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))

return windowed_stream.aggregate(
FlinkAggregate(agg),
accumulator_type=Types.TUPLE([Types.STRING(), Types.INT()]),
output_type=Types.STRING(),
)
26 changes: 26 additions & 0 deletions py/sentry_streams/flink/flink_agg_fn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Any

from pyflink.datastream.functions import AggregateFunction
from sentry_streams.user_functions.agg_template import Accumulator


class FlinkAggregate(AggregateFunction):

def __init__(self, acc: Accumulator) -> None:
self.acc = acc

def create_accumulator(self) -> Any:
print("CREATED")
return self.acc.create()

def add(self, value: Any, accumulator: Any) -> Any:
print("ADDED")
return self.acc.add(accumulator, value)

def get_result(self, accumulator: Any) -> Any:
print("RESULT")
return self.acc.get_output(accumulator)

def merge(self, acc_a: Any, acc_b: Any) -> Any:
print("MERGE")
return self.acc.merge(acc_a, acc_b)
17 changes: 0 additions & 17 deletions py/sentry_streams/modules.py

This file was deleted.

32 changes: 30 additions & 2 deletions py/sentry_streams/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,25 @@
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import MutableMapping
from typing import Any, Callable, MutableMapping

from sentry_streams.user_functions.agg_template import Accumulator


class StepType(Enum):
SINK = "sink"
SOURCE = "source"
MAP = "map"
REDUCE = "reduce"


class StateBackend(Enum):
HASH_MAP = "hash_map"


class Window(Enum):
SLIDING = "sliding"
TUMBLING = "tumbling"


class Pipeline:
Expand Down Expand Up @@ -114,5 +126,21 @@ class Map(WithInput):
# instead of a raw string
# TODO: Allow product to both enable and access
# configuration (e.g. a DB that is used as part of Map)
function: str
function: Callable[..., Any]
step_type: StepType = StepType.MAP


@dataclass
class Reduce(WithInput):
# group_by_key: refactor to Callable reference
group_by_key: Callable[..., Any]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the group by key may not be mandatory. Let's say you want to simulate something like

SELECT count(*) from table

Basically accumulate data in a counter or any other data sketch without grouping in any way.

# windowing mechanism, is this going to be mandatory?
Copy link
Collaborator

@fpacifici fpacifici Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily.
Windowing is needed to produce intermediate results on an unbounded stream. So every time this step emits results we need windowing. But there are stateful primitives that do not emit results. Think about the counter in postgres. We just count and periodically store in postgres.
This about how you would model this https://www.notion.so/sentry/Data-workers-1808b10e4b5d806983f3f0e22a2c2758?pvs=4#1858b10e4b5d80c98207e29786a1317a

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, we'll support that use case (for now) using windowed aggregation + sink.

So windowing will be required in this iteration of the API

# windowing: Window
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing a watermark content. Basically the windowing algorithm decides the window semantics, but the user has to specify how/when to close a window.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added by allowing users to configure windows and triggers

Copy link
Member Author

@ayirr7 ayirr7 Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point. Windows that close by event time are inherently tied to the watermarking strategy.

This strategy can be set directly on the source especially if we're using something like Kafka. We can also allow users to configure their own as part of a step like Reduce, though I believe that's not recommended because that would mean a new watermarked stream is being produced as part of the step (at least for Flink).

WDYT about how we should configure watermarking strategy? I feel that it makes sense to set it up directly on the source.

# aggregation (use standard accumulator)
aggregate_fn: Accumulator
step_type: StepType = StepType.REDUCE
# storage: a fixed (enum?) set of storage backends we provide
# consider making this a class
storage: StateBackend = StateBackend.HASH_MAP
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the storage hidden by the accumulator at times ?
Think about these cases:

  • Aggregation with windowing and producing the output. Here we can separate aggregaiton function from store as we can store the aggregation state where we want.
  • Global state: accumulating counts in postgres. Here the storaeg is application specific so there is no point in separating the two.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same discussion we had related to #18 (comment)

I can leave a TODO to come back to this in a next PR since this once is getting large


# keyed stream --> windowed stream --> reduce to datastream
1 change: 1 addition & 0 deletions py/sentry_streams/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def iterate_edges(p_graph: Pipeline, translator: RuntimeTranslator) -> None:
next_step: WithInput = cast(WithInput, p_graph.steps[output_step_name])
print(f"Apply step: {next_step.name}")
next_step_stream = translator.translate_step(next_step, input_stream)
print(f"stream type {type(next_step_stream)}")
step_streams[next_step.name] = next_step_stream


Expand Down
15 changes: 0 additions & 15 deletions py/sentry_streams/sample_function.py

This file was deleted.

28 changes: 28 additions & 0 deletions py/sentry_streams/user_functions/agg_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from abc import ABC, abstractmethod
from typing import Any


class Accumulator(ABC):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a sense on which accumulators we will provide out of the box ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I think that might be a good idea for next PR. Leaving a TODO


@abstractmethod
def create(self) -> Any:
raise NotImplementedError

@abstractmethod
def add(self, acc: Any, value: Any) -> Any:
raise NotImplementedError

@abstractmethod
def get_output(self, acc: Any) -> Any:
raise NotImplementedError

@abstractmethod
def merge(self, acc1: Any, acc2: Any) -> Any:
raise NotImplementedError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hint. Try to make this generic rather than have Any as type everywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, right now just getting things working with Flink



# class GroupBy(ABC):

# @abstractmethod
# def get_key(payload):
# pass
16 changes: 16 additions & 0 deletions py/sentry_streams/user_functions/sample_agg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sentry_streams.user_functions.agg_template import Accumulator


class WordCounter(Accumulator):

def create(self) -> tuple[str, int]:
return "", 0

def add(self, acc: tuple[str, int], value: tuple[str, int]) -> tuple[str, int]:
return value[0], acc[1] + value[1]

def get_output(self, acc: tuple[str, int]) -> str:
return f"{acc[0]} {acc[1]}"

def merge(self, acc1: tuple[str, int], acc2: tuple[str, int]) -> tuple[str, int]:
return acc1[0], acc1[1] + acc2[1]
9 changes: 9 additions & 0 deletions py/sentry_streams/user_functions/sample_group_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
def my_group_by(msg_payload: tuple[str, int]) -> str:
return msg_payload[0]


def dumb_group_by(msg: str) -> str:
return msg


# lambda x: x[0] simplest
28 changes: 28 additions & 0 deletions py/sentry_streams/user_functions/sample_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import json


class EventsPipelineMapFunction:
"""
Sample user-defined functions to
plug into pipeline
"""

@staticmethod
def dumb_map(value: str) -> str:
d = json.loads(value)
word: str = d.get("word", "null_word")

return "hello." + word

@staticmethod
def simple_map(value: str) -> tuple[str, int]:
d = json.loads(value)
word: str = d.get("word", "null_word")

return (word, 1)

@staticmethod
def str_convert(value: tuple[str, int]) -> str:
word, count = value

return f"{word} {count}"
Loading