Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve setup to facilitate integration testing and future service setup #285

Merged
merged 6 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions src/beamlime/core/config_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
import threading
from typing import Any

from confluent_kafka import Consumer
from ..kafka.message_adapter import KafkaMessage
from ..kafka.source import KafkaConsumer


class ConfigSubscriber:
def __init__(
self,
*,
consumer: Consumer,
consumer: KafkaConsumer,
config: dict[str, Any] | None = None,
logger: logging.Logger | None = None,
):
Expand All @@ -32,22 +33,25 @@ def start(self):
self._thread = threading.Thread(target=self._run_loop)
self._thread.start()

def _process_message(self, message: KafkaMessage | None) -> None:
if message is None:
return
if message.error():
self._logger.error('Consumer error: %s', message.error())
return
key = message.key().decode('utf-8')
value = json.loads(message.value().decode('utf-8'))
self._logger.info(
'Updating config: %s = %s at %s', key, value, message.timestamp()
)
self._config[key] = value
Comment on lines +42 to +47
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be careful of using key in this case.
key is for associating the messages with a certain partition, so it's mainly for the broker not the consumer.
https://www.confluent.io/learn/kafka-message-key/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to new issue #286, since this is unrelated to this change (just moved code).


def _run_loop(self):
try:
while self._running:
msg = self._consumer.poll(0.1)
if msg is None:
continue
if msg.error():
self._logger.error('Consumer error: %s', msg.error())
continue

key = msg.key().decode('utf-8')
value = json.loads(msg.value().decode('utf-8'))
self._logger.info(
'Updating config: %s = %s at %s', key, value, msg.timestamp()
)
self._config[key] = value
messages = self._consumer.consume(num_messages=100, timeout=0.1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the config consumer need to consume so many messages at once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To enable better batching and minimize the number of back and forth with the broker? True, this usually does not happen as few values are updated, but it does when loading initial config or catching up after restart.

Do you think it is harmful?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since the timeout is very small, it should be fine.

for msg in messages:
self._process_message(msg)
except RuntimeError as e:
self._logger.exception("Error ConfigSubscriber loop failed: %s", e)
self.stop()
Expand Down
23 changes: 13 additions & 10 deletions src/beamlime/kafka/message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@ class KafkaMessage(Protocol):
def error(self) -> Any | None:
pass

def value(self) -> bytes:
def key(self) -> bytes:
pass

def topic(self) -> str:
def value(self) -> bytes:
pass

def timestamp(self) -> int:
pass

def message_schema(msg: KafkaMessage) -> str | None:
"""
Extracts the schema from a Kafka message by the streaming_data_types library.
"""
if msg.error() is not None or len(msg.value()) < 8:
return None
return msg.value()[4:8].decode()
def topic(self) -> str:
pass


class FakeKafkaMessage(KafkaMessage):
Expand All @@ -47,9 +44,15 @@ def __init__(self, value: bytes, topic: str):
def error(self) -> Any | None:
return None

def key(self) -> bytes:
return b""

def value(self) -> bytes:
return self._value

def timestamp(self) -> int:
return 0

def topic(self) -> str:
return self._topic

Expand Down Expand Up @@ -125,7 +128,7 @@ def __init__(self, routes: dict[str, MessageAdapter[KafkaMessage, T]]):
self._routes = routes

def adapt(self, message: KafkaMessage) -> Message[T]:
schema = message_schema(message)
schema = streaming_data_types.utils.get_schema(message.value())
if schema is None:
raise streaming_data_types.exceptions.WrongSchemaException()
return self._routes[schema].adapt(message)
Expand Down
55 changes: 55 additions & 0 deletions src/beamlime/service_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
from __future__ import annotations

import logging
from typing import Generic, TypeVar

from .core import ConfigSubscriber, HandlerRegistry, MessageSink, StreamProcessor
from .core.handler import Handler
from .core.service import Service
from .kafka.message_adapter import AdaptingMessageSource, MessageAdapter
from .kafka.source import KafkaConsumer, KafkaMessageSource

Traw = TypeVar("Traw")
Tin = TypeVar("Tin")
Tout = TypeVar("Tout")


class DataServiceBuilder(Generic[Traw, Tin, Tout]):
def __init__(
self,
*,
instrument: str,
name: str,
log_level: int = logging.INFO,
adapter: MessageAdapter[Traw, Tin],
handler_cls: type[Handler[Tin, Tout]],
):
self._name = f'{instrument}_{name}'
self._log_level = log_level
self._adapter = adapter
self._handler_cls = handler_cls

def build(
self,
control_consumer: KafkaConsumer,
consumer: KafkaConsumer,
sink: MessageSink[Tout],
) -> Service:
config_subscriber = ConfigSubscriber(consumer=control_consumer, config={})
processor = StreamProcessor(
source=AdaptingMessageSource(
source=KafkaMessageSource(consumer=consumer), adapter=self._adapter
),
sink=sink,
handler_registry=HandlerRegistry(
config=config_subscriber, handler_cls=self._handler_cls
),
)
return Service(
children=[config_subscriber],
processor=processor,
name=self._name,
log_level=self._log_level,
)
32 changes: 12 additions & 20 deletions src/beamlime/services/monitor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
from contextlib import ExitStack
from typing import Literal, NoReturn

from beamlime import ConfigSubscriber, HandlerRegistry, Service, StreamProcessor
from beamlime import Service
from beamlime.config import config_names
from beamlime.config.config_loader import load_config
from beamlime.handlers.monitor_data_handler import create_monitor_data_handler
from beamlime.kafka import consumer as kafka_consumer
from beamlime.kafka.message_adapter import (
AdaptingMessageSource,
ChainedAdapter,
Da00ToScippAdapter,
Ev44ToMonitorEventsAdapter,
Expand All @@ -20,7 +19,7 @@
RoutingAdapter,
)
from beamlime.kafka.sink import KafkaSink
from beamlime.kafka.source import KafkaMessageSource
from beamlime.service_factory import DataServiceBuilder
from beamlime.sinks import PlotToPngSink


Expand All @@ -41,7 +40,6 @@ def run_service(
instrument: str,
log_level: int = logging.INFO,
) -> NoReturn:
service_name = f'{instrument}_monitor_data_demo'
config = load_config(namespace=config_names.monitor_data, env='')
consumer_config = load_config(namespace=config_names.raw_data_consumer, env='')
kafka_downstream_config = load_config(namespace=config_names.kafka_downstream)
Expand All @@ -62,11 +60,18 @@ def run_service(
}
)

builder = DataServiceBuilder(
instrument=instrument,
name='monitor_data',
log_level=log_level,
adapter=adapter,
handler_cls=create_monitor_data_handler,
)

with ExitStack() as stack:
control_consumer = stack.enter_context(
kafka_consumer.make_control_consumer(instrument=instrument)
)
config_subscriber = ConfigSubscriber(consumer=control_consumer, config={})
consumer = stack.enter_context(
kafka_consumer.make_consumer_from_config(
topics=config['topics'],
Expand All @@ -75,21 +80,8 @@ def run_service(
group='monitor_data',
)
)

processor = StreamProcessor(
source=AdaptingMessageSource(
source=KafkaMessageSource(consumer=consumer), adapter=adapter
),
sink=sink,
handler_registry=HandlerRegistry(
config=config_subscriber, handler_cls=create_monitor_data_handler
),
)
service = Service(
children=[config_subscriber],
processor=processor,
name=service_name,
log_level=log_level,
service = builder.build(
control_consumer=control_consumer, consumer=consumer, sink=sink
)
service.start()

Expand Down
43 changes: 41 additions & 2 deletions tests/kafka/message_adapter_test.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)

import pytest
from streaming_data_types import eventdata_ev44

from beamlime.core.message import MessageSource
from beamlime.core.message import Message, MessageKey, MessageSource
from beamlime.kafka.message_adapter import (
AdaptingMessageSource,
ChainedAdapter,
Ev44ToMonitorEventsAdapter,
FakeKafkaMessage,
KafkaMessage,
KafkaToEv44Adapter,
RoutingAdapter,
)


Expand Down Expand Up @@ -52,3 +53,41 @@ def test_adapting_source() -> None:
assert messages[0].key.source_name == "monitor1"
assert messages[0].value.time_of_arrival == [123456]
assert messages[0].timestamp == 1234


def message_with_schema(schema: str) -> KafkaMessage:
"""
Create a fake Kafka message with the given schema.
The streaming_data_types library uses bytes 4:8 to store the schema.
"""
return FakeKafkaMessage(value=f"xxxx{schema}".encode(), topic=schema)


def test_routing_adapter_raises_KeyError_if_no_route_found() -> None:
adapter = RoutingAdapter(routes={})
with pytest.raises(KeyError, match="ev44"):
adapter.adapt(message_with_schema("ev44"))


def fake_message_with_value(message: KafkaMessage, value: str) -> Message[str]:
return Message(
timestamp=1234,
key=MessageKey(topic=message.topic(), source_name="dummy"),
value=value,
)


def test_routing_adapter_calls_adapter_based_on_route() -> None:
class Adapter:
def __init__(self, value: str):
self._value = value

def adapt(self, message: KafkaMessage) -> Message[str]:
return fake_message_with_value(message, self._value)

adapter = RoutingAdapter(
routes={"ev44": Adapter('adapter1'), "da00": Adapter('adapter2')}
)
assert adapter.adapt(message_with_schema('ev44')).value == "adapter1"
assert adapter.adapt(message_with_schema('da00')).value == "adapter2"
83 changes: 83 additions & 0 deletions tests/services/data_service_test.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be easier to write a mock object instead of making a fake consumer class from scratch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just have it in mind as an alternative to write a fake object in case it becomes too complicated later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, feel free to refactor when writing more tests, in case a mocking approach is more convenient.

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)

import time

from beamlime import Handler, Message, MessageKey
from beamlime.fakes import FakeMessageSink
from beamlime.kafka.message_adapter import (
FakeKafkaMessage,
KafkaMessage,
MessageAdapter,
)
from beamlime.kafka.source import KafkaConsumer
from beamlime.service_factory import DataServiceBuilder


def fake_message_with_value(message: KafkaMessage, value: str) -> Message[str]:
return Message(
timestamp=1234,
key=MessageKey(topic=message.topic(), source_name="dummy"),
value=value,
)


class ForwardingAdapter(MessageAdapter[KafkaMessage, Message[int]]):
def adapt(self, message: KafkaMessage) -> Message[int]:
return fake_message_with_value(message, value=int(message.value().decode()))


class ForwardingHandler(Handler[int, int]):
def handle(self, message: Message[int]) -> list[Message[int]]:
return [message]


class EmptyConsumer(KafkaConsumer):
def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]:
return []

def close(self) -> None:
pass


class IntConsumer(KafkaConsumer):
def __init__(self) -> None:
self._values = [11, 22, 33, 44]
self._index = 0

@property
def at_end(self) -> bool:
return self._index >= len(self._values)

def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]:
if self.at_end:
return []
message = FakeKafkaMessage(
value=str(self._values[self._index]).encode(), topic="dummy"
)
self._index += 1
return [message]

def close(self) -> None:
pass


def test_basics() -> None:
builder = DataServiceBuilder(
instrument='instrument',
name='name',
adapter=ForwardingAdapter(),
handler_cls=ForwardingHandler,
)
sink = FakeMessageSink()
consumer = IntConsumer()
service = builder.build(
control_consumer=EmptyConsumer(), consumer=consumer, sink=sink
)
service.start(blocking=False)
while not consumer.at_end:
time.sleep(0.1)
service.stop()
assert len(sink.messages) == 4
values = [msg.value for msg in sink.messages]
assert values == [11, 22, 33, 44]