Skip to content

Commit 297f7ee

Browse files
authored
Merge pull request #4 from scipp/async
Add asynchronous application constructor.
2 parents 7292668 + c8b39da commit 297f7ee

File tree

6 files changed

+421
-51
lines changed

6 files changed

+421
-51
lines changed

docs/developer/async-programming.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Asynchronous Programming
2+
3+
As a python application construction tool, ``appstract`` supports asynchronous programming.
4+
5+
This page explains asynchronous characteristic of ``appstract`` and debugging logs.
6+
7+
## Debugging Logs
8+
9+
### `asyncio.get_event_loop` vs `asyncio.new_event_loop`
10+
11+
1. `asyncio.get_event_loop`
12+
``get_event_loop`` will always return the current event loop.
13+
If there is no event loop set in the thread, it will create a new one
14+
and set it as a current event loop of the thread, and return the loop.
15+
Many of ``asyncio`` free functions internally use ``get_event_loop``,
16+
i.e. `asyncio.create_task`.
17+
18+
**Things to consider while using `asyncio.get_event_loop`.**
19+
20+
* ``asyncio.create_task`` does not guarantee
21+
whether the current loop is/will be alive until the task is done.
22+
You may use ``run_until_complete`` to make sure the loop is not closed
23+
until the task is finished.
24+
When you need to throw multiple async calls to the loop,
25+
use ``asyncio.gather`` to merge all the tasks like in this method.
26+
* ``close`` or ``stop`` might accidentally destroy/interrupt
27+
other tasks running in the same event loop.
28+
i.e. You can accidentally destroy the event loop of a jupyter kernel.
29+
* *1* :class:`RuntimeError` if there has been an event loop set in the
30+
thread object before but it is now removed.
31+
32+
2. `asyncio.new_event_loop`
33+
``asyncio.new_event_loop`` will always return the new event loop,
34+
but it is not set it as a current loop of the thread automatically.
35+
36+
However, sometimes it is automatically handled within the thread,
37+
and it caused errors which was hard to debug under ``pytest`` session.
38+
For example,
39+
40+
* The new event loop was not closed properly as it is destroyed.
41+
* The new event loop was never started until it is destroyed.
42+
43+
44+
``Traceback`` of ``pytest`` did not show
45+
where exactly the error is from in those cases.
46+
It was resolved by using `asyncio.get_event_loop`,
47+
or manually closing the event loop at the end of the test.
48+
49+
**When to use** `asyncio.new_event_loop`.
50+
51+
* `asyncio.get_event_loop` raises `RuntimeError` *1*.
52+
* Multi-threads.
53+
54+
Please note that the loop object might need to be **closed** manually.

src/appstract/async.py

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
# SPDX-License-Identifier: BSD-3-Clause
2+
# Copyright (c) 2024 Scipp contributors (https://github.com/scipp)
3+
"""Asynchronous application components."""
4+
5+
import asyncio
6+
from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine, Generator
7+
from contextlib import asynccontextmanager, contextmanager
8+
from dataclasses import dataclass
9+
from typing import (
10+
Any,
11+
Protocol,
12+
TypeVar,
13+
runtime_checkable,
14+
)
15+
16+
from .logging import AppLogger
17+
from .mixins import LogMixin
18+
19+
20+
@runtime_checkable
21+
class MessageProtocol(Protocol):
22+
content: Any
23+
24+
25+
class HandlerProtocol(Protocol):
26+
"""A callable object accepts a single message as the first positional argument.
27+
28+
These handlers are called by :class:`~MessageRouter` whenever another
29+
handler or a daemon publishes the relevant message (by type).
30+
"""
31+
32+
def __call__(self, message: MessageProtocol) -> Any: ...
33+
34+
35+
class DaemonMessageGeneratorProtocol(Protocol):
36+
"""A callable object that returns a message generator.
37+
38+
Daemon message generators are expected to have a long life cycle.
39+
i.e. repeatedly monitoring a data pipe, or listening to a message broker.
40+
"""
41+
42+
def __call__(self) -> AsyncGenerator[MessageProtocol | None, None]: ...
43+
44+
45+
MessageT = TypeVar("MessageT", bound=MessageProtocol)
46+
HandlerT = TypeVar("HandlerT", bound=Callable)
47+
48+
49+
class MessageRouter(LogMixin):
50+
"""A message router that routes messages to handlers."""
51+
52+
logger: AppLogger
53+
54+
def __init__(self):
55+
from queue import Queue
56+
57+
self.handlers: dict[
58+
type[MessageProtocol], list[Callable[[MessageProtocol], Any]]
59+
] = {}
60+
self.awaitable_handlers: dict[
61+
type[MessageProtocol], list[Callable[[MessageProtocol], Awaitable[Any]]]
62+
] = {}
63+
self.message_pipe = Queue()
64+
65+
@contextmanager
66+
def _handler_wrapper(
67+
self, handler: Callable[..., Any], message: MessageProtocol
68+
) -> Generator[None, None, None]:
69+
try:
70+
self.debug(f"Routing event {type(message)} to handler {handler}...")
71+
yield
72+
except Exception as e:
73+
self.warning(f"Failed to handle event {type(message)}")
74+
raise e
75+
else:
76+
self.debug(f"Routing event {type(message)} to handler {handler} done.")
77+
78+
def _register(
79+
self,
80+
*,
81+
handler_list: dict[type[MessageT], list[HandlerT]],
82+
event_tp: type[MessageT],
83+
handler: HandlerT,
84+
):
85+
if event_tp in handler_list:
86+
handler_list[event_tp].append(handler)
87+
else:
88+
handler_list[event_tp] = [handler]
89+
90+
def register_handler(
91+
self,
92+
event_tp: type[MessageT],
93+
handler: Callable[[MessageT], Any] | Callable[[MessageT], Awaitable[Any]],
94+
):
95+
if asyncio.iscoroutinefunction(handler):
96+
handler_list = self.awaitable_handlers
97+
else:
98+
handler_list = self.handlers
99+
100+
self._register(handler_list=handler_list, event_tp=event_tp, handler=handler)
101+
102+
def _collect_results(self, result: Any) -> list[MessageProtocol]:
103+
"""Append or extend ``result`` to ``self.message_pipe``.
104+
105+
It filters out non-AppstractMessage objects from ``result``.
106+
"""
107+
if isinstance(result, MessageProtocol):
108+
return [result]
109+
elif isinstance(result, tuple):
110+
return [_msg for _msg in result if isinstance(_msg, MessageProtocol)]
111+
else:
112+
return []
113+
114+
async def route(self, message: MessageProtocol) -> None:
115+
# Synchronous handlers
116+
results = []
117+
for handler in (handlers := self.handlers.get(type(message), [])):
118+
await asyncio.sleep(0) # Let others use the event loop.
119+
with self._handler_wrapper(handler, message):
120+
results.extend(self._collect_results(handler(message)))
121+
122+
# Asynchronous handlers
123+
for handler in (
124+
awaitable_handlers := self.awaitable_handlers.get(type(message), [])
125+
):
126+
with self._handler_wrapper(handler, message):
127+
results.extend(self._collect_results(await handler(message)))
128+
129+
# No handlers
130+
if not (handlers or awaitable_handlers):
131+
self.warning(f"No handler for event {type(message)}. Ignoring...")
132+
133+
# Re-route the results
134+
for result in results:
135+
self.message_pipe.put(result)
136+
137+
async def run(
138+
self,
139+
) -> AsyncGenerator[MessageProtocol | None, None]:
140+
"""Message router daemon."""
141+
while True:
142+
await asyncio.sleep(0)
143+
if self.message_pipe.empty():
144+
await asyncio.sleep(0.1)
145+
while not self.message_pipe.empty():
146+
await self.route(self.message_pipe.get())
147+
yield
148+
149+
async def send_message_async(self, message: MessageProtocol) -> None:
150+
self.message_pipe.put(message)
151+
await asyncio.sleep(0)
152+
153+
154+
class Application(LogMixin):
155+
"""Application class.
156+
157+
Main Responsibilities:
158+
- Create/retrieve event loop if needed.
159+
- Create tasks if an event loop exists already.
160+
- Register handling methods if applicable.
161+
- Create/collect tasks of daemons
162+
(via :func:`~DaemonInterface.run` method).
163+
164+
"""
165+
166+
@dataclass
167+
class Stop:
168+
"""A message to break the routing loop."""
169+
170+
content: Any
171+
172+
def __init__(self, logger: AppLogger, message_router: MessageRouter) -> None:
173+
import asyncio
174+
175+
self.loop: asyncio.AbstractEventLoop
176+
self.tasks: dict[Callable, asyncio.Task] = {}
177+
self.logger = logger
178+
self.message_router = message_router
179+
self.daemons: list[DaemonMessageGeneratorProtocol] = [self.message_router.run]
180+
self.register_handling_method(self.Stop, self.stop_tasks)
181+
self._break = False
182+
super().__init__()
183+
184+
def stop_tasks(self, message: MessageProtocol | None = None) -> None:
185+
self.info('Stop running application %s...', self.__class__.__name__)
186+
if message is not None and not isinstance(message, self.Stop):
187+
raise TypeError(
188+
f"Expected message of type {self.Stop}, got {type(message)}."
189+
)
190+
self._break = True
191+
192+
def register_handling_method(
193+
self, event_tp: type[MessageT], handler: Callable[[MessageT], Any]
194+
) -> None:
195+
"""Register handlers to the application message router."""
196+
self.message_router.register_handler(event_tp, handler)
197+
198+
def register_daemon(self, daemon: DaemonMessageGeneratorProtocol) -> None:
199+
"""Register a daemon generator to the application.
200+
201+
Registered daemons will be scheduled in the event loop
202+
as :func:`~Application.run` method is called.
203+
The future of the daemon will be collected in the ``self.tasks`` list.
204+
"""
205+
self.daemons.append(daemon)
206+
207+
def cancel_all_tasks(self) -> None:
208+
"""Cancel all tasks."""
209+
for task in self.tasks.values():
210+
task.cancel()
211+
212+
self.tasks.clear()
213+
214+
@asynccontextmanager
215+
async def _daemon_wrapper(
216+
self, daemon: DaemonMessageGeneratorProtocol
217+
) -> AsyncGenerator[None, None]:
218+
try:
219+
self.info('Running daemon %s', daemon.__class__.__qualname__)
220+
yield
221+
except Exception as e:
222+
# Make sure all other async tasks are cancelled.
223+
# It is because raising an exception will destroy only the task
224+
# that had an error raised and may not affect other tasks in some cases,
225+
# e.g. in Jupyter Notebooks.
226+
self.error(f"Daemon {daemon} failed. Cancelling all other tasks...")
227+
# Break all daemon generator loops.
228+
self._break = True
229+
# Let other daemons/handlers clean up.
230+
await self.message_router.route(self.Stop(None))
231+
# Make sure all other async tasks are cancelled.
232+
self.cancel_all_tasks()
233+
raise e
234+
else:
235+
self.info("Daemon %s completed.", daemon.__class__.__qualname__)
236+
237+
def _create_daemon_coroutines(
238+
self,
239+
) -> dict[DaemonMessageGeneratorProtocol, Coroutine]:
240+
async def run_daemon(daemon: DaemonMessageGeneratorProtocol):
241+
async with self._daemon_wrapper(daemon):
242+
async for message in daemon():
243+
if message is not None:
244+
await self.message_router.send_message_async(message)
245+
if self._break:
246+
break
247+
await asyncio.sleep(0)
248+
249+
return {daemon: run_daemon(daemon) for daemon in self.daemons}
250+
251+
def run(self):
252+
"""
253+
Register all handling methods and run all daemons.
254+
255+
It retrieves or creates an event loop
256+
and schedules all coroutines(run methods) of its daemons.
257+
258+
See :doc:`/developer/async_programming` for more details about
259+
why it handles the event loop like this.
260+
261+
This method is only when the ``Application`` object needs to start the
262+
event loop itself.
263+
If there is a running event loop expected, use ```` instead.
264+
265+
"""
266+
import asyncio
267+
268+
from appstract.schedulers import temporary_event_loop
269+
270+
self.info('Start running %s...', self.__class__.__qualname__)
271+
if self.tasks:
272+
raise RuntimeError(
273+
"Application is already running. "
274+
"Cancel all tasks and clear them before running it again."
275+
)
276+
277+
with temporary_event_loop() as loop:
278+
self.loop = loop
279+
daemon_coroutines = self._create_daemon_coroutines()
280+
daemon_tasks = {
281+
daemon: loop.create_task(coro)
282+
for daemon, coro in daemon_coroutines.items()
283+
}
284+
self.tasks.update(daemon_tasks)
285+
if not loop.is_running():
286+
loop.run_until_complete(asyncio.gather(*self.tasks.values()))
287+
288+
def run_after_run(self):
289+
"""
290+
Register all handling methods and run all daemons.
291+
292+
It schedules all coroutines(run methods) of its daemons.
293+
294+
"""
295+
import asyncio
296+
297+
self.info('Start running %s...', self.__class__.__qualname__)
298+
if self.tasks:
299+
raise RuntimeError(
300+
"Application is already running. "
301+
"Cancel all tasks and clear them before running it again."
302+
)
303+
self.loop = asyncio.get_event_loop()
304+
daemon_coroutines = self._create_daemon_coroutines()
305+
daemon_tasks = {
306+
daemon: self.loop.create_task(coro)
307+
for daemon, coro in daemon_coroutines.items()
308+
}
309+
self.tasks.update(daemon_tasks)

src/appstract/interfaces.py

Whitespace-only changes.

src/appstract/logging/_test_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from contextlib import contextmanager
99
from dataclasses import dataclass
1010

11-
from ..protocols import LogMixin
11+
from ..mixins import LogMixin
1212

1313

1414
@contextmanager

0 commit comments

Comments
 (0)