Skip to content

Commit

Permalink
Merge pull request #285 from scipp/integration-testing
Browse files Browse the repository at this point in the history
Improve setup to facilitate integration testing and future service setup
  • Loading branch information
SimonHeybrock authored Jan 16, 2025
2 parents 18ed0ba + 4743f49 commit 6f431bd
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 47 deletions.
34 changes: 19 additions & 15 deletions src/beamlime/core/config_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
import threading
from typing import Any

from confluent_kafka import Consumer
from ..kafka.message_adapter import KafkaMessage
from ..kafka.source import KafkaConsumer


class ConfigSubscriber:
def __init__(
self,
*,
consumer: Consumer,
consumer: KafkaConsumer,
config: dict[str, Any] | None = None,
logger: logging.Logger | None = None,
):
Expand All @@ -32,22 +33,25 @@ def start(self):
self._thread = threading.Thread(target=self._run_loop)
self._thread.start()

def _process_message(self, message: KafkaMessage | None) -> None:
if message is None:
return
if message.error():
self._logger.error('Consumer error: %s', message.error())
return
key = message.key().decode('utf-8')
value = json.loads(message.value().decode('utf-8'))
self._logger.info(
'Updating config: %s = %s at %s', key, value, message.timestamp()
)
self._config[key] = value

def _run_loop(self):
try:
while self._running:
msg = self._consumer.poll(0.1)
if msg is None:
continue
if msg.error():
self._logger.error('Consumer error: %s', msg.error())
continue

key = msg.key().decode('utf-8')
value = json.loads(msg.value().decode('utf-8'))
self._logger.info(
'Updating config: %s = %s at %s', key, value, msg.timestamp()
)
self._config[key] = value
messages = self._consumer.consume(num_messages=100, timeout=0.1)
for msg in messages:
self._process_message(msg)
except RuntimeError as e:
self._logger.exception("Error ConfigSubscriber loop failed: %s", e)
self.stop()
Expand Down
23 changes: 13 additions & 10 deletions src/beamlime/kafka/message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@ class KafkaMessage(Protocol):
def error(self) -> Any | None:
pass

def value(self) -> bytes:
def key(self) -> bytes:
pass

def topic(self) -> str:
def value(self) -> bytes:
pass

def timestamp(self) -> int:
pass

def message_schema(msg: KafkaMessage) -> str | None:
"""
Extracts the schema from a Kafka message by the streaming_data_types library.
"""
if msg.error() is not None or len(msg.value()) < 8:
return None
return msg.value()[4:8].decode()
def topic(self) -> str:
pass


class FakeKafkaMessage(KafkaMessage):
Expand All @@ -47,9 +44,15 @@ def __init__(self, value: bytes, topic: str):
def error(self) -> Any | None:
return None

def key(self) -> bytes:
return b""

def value(self) -> bytes:
return self._value

def timestamp(self) -> int:
return 0

def topic(self) -> str:
return self._topic

Expand Down Expand Up @@ -125,7 +128,7 @@ def __init__(self, routes: dict[str, MessageAdapter[KafkaMessage, T]]):
self._routes = routes

def adapt(self, message: KafkaMessage) -> Message[T]:
schema = message_schema(message)
schema = streaming_data_types.utils.get_schema(message.value())
if schema is None:
raise streaming_data_types.exceptions.WrongSchemaException()
return self._routes[schema].adapt(message)
Expand Down
55 changes: 55 additions & 0 deletions src/beamlime/service_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
from __future__ import annotations

import logging
from typing import Generic, TypeVar

from .core import ConfigSubscriber, HandlerRegistry, MessageSink, StreamProcessor
from .core.handler import Handler
from .core.service import Service
from .kafka.message_adapter import AdaptingMessageSource, MessageAdapter
from .kafka.source import KafkaConsumer, KafkaMessageSource

Traw = TypeVar("Traw")
Tin = TypeVar("Tin")
Tout = TypeVar("Tout")


class DataServiceBuilder(Generic[Traw, Tin, Tout]):
def __init__(
self,
*,
instrument: str,
name: str,
log_level: int = logging.INFO,
adapter: MessageAdapter[Traw, Tin],
handler_cls: type[Handler[Tin, Tout]],
):
self._name = f'{instrument}_{name}'
self._log_level = log_level
self._adapter = adapter
self._handler_cls = handler_cls

def build(
self,
control_consumer: KafkaConsumer,
consumer: KafkaConsumer,
sink: MessageSink[Tout],
) -> Service:
config_subscriber = ConfigSubscriber(consumer=control_consumer, config={})
processor = StreamProcessor(
source=AdaptingMessageSource(
source=KafkaMessageSource(consumer=consumer), adapter=self._adapter
),
sink=sink,
handler_registry=HandlerRegistry(
config=config_subscriber, handler_cls=self._handler_cls
),
)
return Service(
children=[config_subscriber],
processor=processor,
name=self._name,
log_level=self._log_level,
)
32 changes: 12 additions & 20 deletions src/beamlime/services/monitor_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
from contextlib import ExitStack
from typing import Literal, NoReturn

from beamlime import ConfigSubscriber, HandlerRegistry, Service, StreamProcessor
from beamlime import Service
from beamlime.config import config_names
from beamlime.config.config_loader import load_config
from beamlime.handlers.monitor_data_handler import create_monitor_data_handler
from beamlime.kafka import consumer as kafka_consumer
from beamlime.kafka.message_adapter import (
AdaptingMessageSource,
ChainedAdapter,
Da00ToScippAdapter,
Ev44ToMonitorEventsAdapter,
Expand All @@ -20,7 +19,7 @@
RoutingAdapter,
)
from beamlime.kafka.sink import KafkaSink
from beamlime.kafka.source import KafkaMessageSource
from beamlime.service_factory import DataServiceBuilder
from beamlime.sinks import PlotToPngSink


Expand All @@ -41,7 +40,6 @@ def run_service(
instrument: str,
log_level: int = logging.INFO,
) -> NoReturn:
service_name = f'{instrument}_monitor_data_demo'
config = load_config(namespace=config_names.monitor_data, env='')
consumer_config = load_config(namespace=config_names.raw_data_consumer, env='')
kafka_downstream_config = load_config(namespace=config_names.kafka_downstream)
Expand All @@ -62,11 +60,18 @@ def run_service(
}
)

builder = DataServiceBuilder(
instrument=instrument,
name='monitor_data',
log_level=log_level,
adapter=adapter,
handler_cls=create_monitor_data_handler,
)

with ExitStack() as stack:
control_consumer = stack.enter_context(
kafka_consumer.make_control_consumer(instrument=instrument)
)
config_subscriber = ConfigSubscriber(consumer=control_consumer, config={})
consumer = stack.enter_context(
kafka_consumer.make_consumer_from_config(
topics=config['topics'],
Expand All @@ -75,21 +80,8 @@ def run_service(
group='monitor_data',
)
)

processor = StreamProcessor(
source=AdaptingMessageSource(
source=KafkaMessageSource(consumer=consumer), adapter=adapter
),
sink=sink,
handler_registry=HandlerRegistry(
config=config_subscriber, handler_cls=create_monitor_data_handler
),
)
service = Service(
children=[config_subscriber],
processor=processor,
name=service_name,
log_level=log_level,
service = builder.build(
control_consumer=control_consumer, consumer=consumer, sink=sink
)
service.start()

Expand Down
43 changes: 41 additions & 2 deletions tests/kafka/message_adapter_test.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)

import pytest
from streaming_data_types import eventdata_ev44

from beamlime.core.message import MessageSource
from beamlime.core.message import Message, MessageKey, MessageSource
from beamlime.kafka.message_adapter import (
AdaptingMessageSource,
ChainedAdapter,
Ev44ToMonitorEventsAdapter,
FakeKafkaMessage,
KafkaMessage,
KafkaToEv44Adapter,
RoutingAdapter,
)


Expand Down Expand Up @@ -52,3 +53,41 @@ def test_adapting_source() -> None:
assert messages[0].key.source_name == "monitor1"
assert messages[0].value.time_of_arrival == [123456]
assert messages[0].timestamp == 1234


def message_with_schema(schema: str) -> KafkaMessage:
"""
Create a fake Kafka message with the given schema.
The streaming_data_types library uses bytes 4:8 to store the schema.
"""
return FakeKafkaMessage(value=f"xxxx{schema}".encode(), topic=schema)


def test_routing_adapter_raises_KeyError_if_no_route_found() -> None:
adapter = RoutingAdapter(routes={})
with pytest.raises(KeyError, match="ev44"):
adapter.adapt(message_with_schema("ev44"))


def fake_message_with_value(message: KafkaMessage, value: str) -> Message[str]:
return Message(
timestamp=1234,
key=MessageKey(topic=message.topic(), source_name="dummy"),
value=value,
)


def test_routing_adapter_calls_adapter_based_on_route() -> None:
class Adapter:
def __init__(self, value: str):
self._value = value

def adapt(self, message: KafkaMessage) -> Message[str]:
return fake_message_with_value(message, self._value)

adapter = RoutingAdapter(
routes={"ev44": Adapter('adapter1'), "da00": Adapter('adapter2')}
)
assert adapter.adapt(message_with_schema('ev44')).value == "adapter1"
assert adapter.adapt(message_with_schema('da00')).value == "adapter2"
83 changes: 83 additions & 0 deletions tests/services/data_service_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)

import time

from beamlime import Handler, Message, MessageKey
from beamlime.fakes import FakeMessageSink
from beamlime.kafka.message_adapter import (
FakeKafkaMessage,
KafkaMessage,
MessageAdapter,
)
from beamlime.kafka.source import KafkaConsumer
from beamlime.service_factory import DataServiceBuilder


def fake_message_with_value(message: KafkaMessage, value: str) -> Message[str]:
return Message(
timestamp=1234,
key=MessageKey(topic=message.topic(), source_name="dummy"),
value=value,
)


class ForwardingAdapter(MessageAdapter[KafkaMessage, Message[int]]):
def adapt(self, message: KafkaMessage) -> Message[int]:
return fake_message_with_value(message, value=int(message.value().decode()))


class ForwardingHandler(Handler[int, int]):
def handle(self, message: Message[int]) -> list[Message[int]]:
return [message]


class EmptyConsumer(KafkaConsumer):
def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]:
return []

def close(self) -> None:
pass


class IntConsumer(KafkaConsumer):
def __init__(self) -> None:
self._values = [11, 22, 33, 44]
self._index = 0

@property
def at_end(self) -> bool:
return self._index >= len(self._values)

def consume(self, num_messages: int, timeout: float) -> list[KafkaMessage]:
if self.at_end:
return []
message = FakeKafkaMessage(
value=str(self._values[self._index]).encode(), topic="dummy"
)
self._index += 1
return [message]

def close(self) -> None:
pass


def test_basics() -> None:
builder = DataServiceBuilder(
instrument='instrument',
name='name',
adapter=ForwardingAdapter(),
handler_cls=ForwardingHandler,
)
sink = FakeMessageSink()
consumer = IntConsumer()
service = builder.build(
control_consumer=EmptyConsumer(), consumer=consumer, sink=sink
)
service.start(blocking=False)
while not consumer.at_end:
time.sleep(0.1)
service.stop()
assert len(sink.messages) == 4
values = [msg.value for msg in sink.messages]
assert values == [11, 22, 33, 44]

0 comments on commit 6f431bd

Please sign in to comment.