-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve setup to facilitate integration testing and future service setup #285
Changes from 5 commits
cc4d8ca
cdcb0ec
720b02f
9dc71bc
3d37aa1
4743f49
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,14 +5,15 @@ | |
import threading | ||
from typing import Any | ||
|
||
from confluent_kafka import Consumer | ||
from ..kafka.message_adapter import KafkaMessage | ||
from ..kafka.source import KafkaConsumer | ||
|
||
|
||
class ConfigSubscriber: | ||
def __init__( | ||
self, | ||
*, | ||
consumer: Consumer, | ||
consumer: KafkaConsumer, | ||
config: dict[str, Any] | None = None, | ||
logger: logging.Logger | None = None, | ||
): | ||
|
@@ -32,22 +33,25 @@ def start(self): | |
self._thread = threading.Thread(target=self._run_loop) | ||
self._thread.start() | ||
|
||
def _process_message(self, message: KafkaMessage | None) -> None: | ||
if message is None: | ||
return | ||
if message.error(): | ||
self._logger.error('Consumer error: %s', message.error()) | ||
return | ||
key = message.key().decode('utf-8') | ||
value = json.loads(message.value().decode('utf-8')) | ||
self._logger.info( | ||
'Updating config: %s = %s at %s', key, value, message.timestamp() | ||
) | ||
self._config[key] = value | ||
|
||
def _run_loop(self): | ||
try: | ||
while self._running: | ||
msg = self._consumer.poll(0.1) | ||
if msg is None: | ||
continue | ||
if msg.error(): | ||
self._logger.error('Consumer error: %s', msg.error()) | ||
continue | ||
|
||
key = msg.key().decode('utf-8') | ||
value = json.loads(msg.value().decode('utf-8')) | ||
self._logger.info( | ||
'Updating config: %s = %s at %s', key, value, msg.timestamp() | ||
) | ||
self._config[key] = value | ||
messages = self._consumer.consume(num_messages=100, timeout=0.1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does the config consumer need to consume so many messages at once? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To enable better batching and minimize the number of back and forth with the broker? True, this usually does not happen as few values are updated, but it does when loading initial config or catching up after restart. Do you think it is harmful? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, since the timeout is very small, it should be fine. |
||
for msg in messages: | ||
self._process_message(msg) | ||
except RuntimeError as e: | ||
self._logger.exception("Error ConfigSubscriber loop failed: %s", e) | ||
self.stop() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) | ||
from __future__ import annotations | ||
|
||
import logging | ||
from typing import Generic, TypeVar | ||
|
||
from .core import ConfigSubscriber, HandlerRegistry, MessageSink, StreamProcessor | ||
from .core.handler import Handler | ||
from .core.service import Service | ||
from .kafka.message_adapter import AdaptingMessageSource, MessageAdapter | ||
from .kafka.source import KafkaConsumer, KafkaMessageSource | ||
|
||
Traw = TypeVar("Traw") | ||
Tin = TypeVar("Tin") | ||
Tout = TypeVar("Tout") | ||
|
||
|
||
class DataServiceBuilder(Generic[Traw, Tin, Tout]): | ||
def __init__( | ||
self, | ||
*, | ||
instrument: str, | ||
name: str, | ||
log_level: int = logging.INFO, | ||
adapter: MessageAdapter[Traw, Tin], | ||
handler_cls: type[Handler[Tin, Tout]], | ||
): | ||
self._name = f'{instrument}_{name}' | ||
self._log_level = log_level | ||
self._adapter = adapter | ||
self._handler_cls = handler_cls | ||
|
||
def build( | ||
self, | ||
control_consumer: KafkaConsumer, | ||
consumer: KafkaConsumer, | ||
sink: MessageSink[Tout], | ||
) -> Service: | ||
config_subscriber = ConfigSubscriber(consumer=control_consumer, config={}) | ||
processor = StreamProcessor( | ||
source=AdaptingMessageSource( | ||
source=KafkaMessageSource(consumer=consumer), adapter=self._adapter | ||
), | ||
sink=sink, | ||
handler_registry=HandlerRegistry( | ||
config=config_subscriber, handler_cls=self._handler_cls | ||
), | ||
) | ||
return Service( | ||
children=[config_subscriber], | ||
processor=processor, | ||
name=self._name, | ||
log_level=self._log_level, | ||
) |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will it be easier to write a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can just have it in mind as an alternative to write a fake object in case it becomes too complicated later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, feel free to refactor when writing more tests, in case a mocking approach is more convenient. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) | ||
|
||
import time | ||
|
||
from beamlime import Handler, Message, MessageKey | ||
from beamlime.fakes import FakeMessageSink | ||
from beamlime.kafka.message_adapter import ( | ||
FakeKafkaMessage, | ||
KafkaMessage, | ||
MessageAdapter, | ||
) | ||
from beamlime.kafka.source import KafkaConsumer | ||
from beamlime.service_factory import DataServiceBuilder | ||
|
||
|
||
def fake_message_with_value(message: KafkaMessage, value: str) -> Message[str]: | ||
return Message( | ||
timestamp=1234, | ||
key=MessageKey(topic=message.topic(), source_name="dummy"), | ||
value=value, | ||
) | ||
|
||
|
||
class ForwardingAdapter(MessageAdapter[KafkaMessage, Message[int]]): | ||
def adapt(self, message: KafkaMessage) -> Message[int]: | ||
return fake_message_with_value(message, value=int(message.value().decode())) | ||
|
||
|
||
class ForwardingHandler(Handler[int, int]): | ||
def handle(self, message: Message[int]) -> list[Message[int]]: | ||
return [message] | ||
|
||
|
||
class EmptyConsumer(KafkaConsumer): | ||
def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: | ||
return [] | ||
|
||
def close(self) -> None: | ||
pass | ||
|
||
|
||
class IntConsumer(KafkaConsumer): | ||
def __init__(self) -> None: | ||
self._values = [11, 22, 33, 44] | ||
self._index = 0 | ||
|
||
@property | ||
def at_end(self) -> bool: | ||
return self._index >= len(self._values) | ||
|
||
def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]: | ||
if self.at_end: | ||
return [] | ||
message = FakeKafkaMessage( | ||
value=str(self._values[self._index]).encode(), topic="dummy" | ||
) | ||
self._index += 1 | ||
return [message] | ||
|
||
def close(self) -> None: | ||
pass | ||
|
||
|
||
def test_basics() -> None: | ||
builder = DataServiceBuilder( | ||
instrument='instrument', | ||
name='name', | ||
adapter=ForwardingAdapter(), | ||
handler_cls=ForwardingHandler, | ||
) | ||
sink = FakeMessageSink() | ||
consumer = IntConsumer() | ||
service = builder.build( | ||
control_consumer=EmptyConsumer(), consumer=consumer, sink=sink | ||
) | ||
service.start(blocking=False) | ||
while not consumer.at_end: | ||
time.sleep(0.1) | ||
service.stop() | ||
assert len(sink.messages) == 4 | ||
values = [msg.value for msg in sink.messages] | ||
assert values == [11, 22, 33, 44] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be careful of using
key
in this case.key
is for associating the messages with a certain partition, so it's mainly for the broker not the consumer.https://www.confluent.io/learn/kafka-message-key/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to new issue #286, since this is unrelated to this change (just moved code).