Skip to content

Commit 5399404

Browse files
authored
Merge pull request #6 from unkcpz/rmq-out
The refactoring is targeting to decouple the dependencies of using kiwipy+rmq as the communicator for the process control. By forming a `Coordinator` protocol contract, the different type of rmq/kiwipy related codes are removed out from plumpy logic. The new contract also pave the way to make it clearly show how a new type coordinator can be implemented (future examples will be the `tatzelwurm` a task broker that has scheduler support and file based task broker require no background service). For the prototype of how a coordinator should look like, the `MockCoordinator` in `tests/utils` is the coordinator that store things in memory, and can serve as the lightweight ephemeral daemon without persistent functionality. Another major change here is hand write the resolver of future by mimic how tho asyncio does for wrapping `concurrent.futures.Future` into `asyncio.Future`. I use the same way to convert `asyncio.Future` into `concurent.futures.Future` (which is the `kiwipy.Future` as alias). - move the `aio_pika` import lazily by moving the rmq exceptions to `rmq` module, this can increase the performance of `import aiida; aiida.orm`. - ~~`CancellableAction` using composite to behave as a Future like object.~~ - use `asyncio.Future` in favor of alias `plumpy.Future` and - use `concurrent.futures.Future` instead of alias `kiwipy.Future`. - Hand write `_chain` and `_copy_future` since we can not just rely on the API of asyncio that is not exposed. - Forming the `coordinator/Communicator` protocol. - Just forming the `coordinator/Coordinator` protocol and wrap rmq/communicator as a coordinator that not require changs in kiwipy. - Mock the coordinator for unit test. - test against aiida-core see what need to be changed there and improve here. (aiidateam/aiida-core#6675) - The API for plumpy process can be more compact instead of using kiwipy/rmq "subscriber" concept. (how to replace rpc pattern??)
2 parents bfbe78d + 6d3101d commit 5399404

26 files changed

+1565
-1006
lines changed

docs/source/concepts.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ WorkChains support the use of logical constructs such as `If_` and `While_` to c
3232

3333
A `Controller` can control processes throughout their lifetime, by sending and receiving messages. It can launch, pause, continue, kill and check status of the process.
3434

35-
The {py:class}`~plumpy.process_comms.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.
35+
The {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.
3636

3737
The thread communicator runs on a independent thread (event loop) and so will not be blocked by sometimes long waiting times in the process event loop.
3838
Using RabbitMQ means that even if the computer is terminated unexpectedly, messages are persisted and can be run once the computer restarts.

docs/source/nitpick-exceptions

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ py:class plumpy.base.state_machine.State
2323
py:class State
2424
py:class Process
2525
py:class plumpy.futures.CancellableAction
26-
py:class plumpy.communications.LoopCommunicator
26+
py:class plumpy.rmq.communications.LoopCommunicator
2727
py:class plumpy.persistence.PersistedPickle
2828
py:class plumpy.utils.AttributesFrozendict
2929
py:class plumpy.workchains._FunctionCall

docs/source/tutorial.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
"The {py:class}`~plumpy.workchains.WorkChain`\n",
6767
": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n",
6868
"\n",
69-
"The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n",
69+
"The process `Controller` (principally the {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController`)\n",
7070
": To control the process or workchain throughout its lifetime."
7171
]
7272
},

src/plumpy/__init__.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,21 @@
44

55
import logging
66

7-
from .communications import *
7+
# interfaces
8+
from .controller import ProcessController
9+
from .coordinator import Coordinator
810
from .events import *
911
from .exceptions import *
1012
from .futures import *
1113
from .loaders import *
14+
from .message import *
1215
from .mixins import *
1316
from .persistence import *
1417
from .ports import *
15-
from .process_comms import *
1618
from .process_listener import *
1719
from .process_states import *
1820
from .processes import *
21+
from .rmq import *
1922
from .utils import *
2023
from .workchains import *
2124

@@ -27,14 +30,13 @@
2730
+ futures.__all__
2831
+ mixins.__all__
2932
+ persistence.__all__
30-
+ communications.__all__
31-
+ process_comms.__all__
33+
+ message.__all__
3234
+ process_listener.__all__
3335
+ workchains.__all__
3436
+ loaders.__all__
3537
+ ports.__all__
3638
+ process_states.__all__
37-
)
39+
) + ['ProcessController', 'Coordinator']
3840

3941

4042
# Do this se we don't get the "No handlers could be found..." warnings that will be produced

src/plumpy/controller.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import annotations
3+
4+
from collections.abc import Sequence
5+
from typing import Any, Protocol
6+
7+
from plumpy import loaders
8+
from plumpy.message import MessageType
9+
from plumpy.utils import PID_TYPE
10+
11+
ProcessResult = Any
12+
ProcessStatus = Any
13+
14+
15+
class ProcessController(Protocol):
16+
"""
17+
Control processes using coroutines that will send messages and wait
18+
(in a non-blocking way) for their response
19+
"""
20+
21+
def get_status(self, pid: 'PID_TYPE') -> ProcessStatus:
22+
"""
23+
Get the status of a process with the given PID
24+
:param pid: the process id
25+
:return: the status response from the process
26+
"""
27+
...
28+
29+
def pause_process(self, pid: 'PID_TYPE', msg: Any | None = None) -> ProcessResult:
30+
"""
31+
Pause the process
32+
33+
:param pid: the pid of the process to pause
34+
:param msg: optional pause message
35+
:return: True if paused, False otherwise
36+
"""
37+
...
38+
39+
def play_process(self, pid: 'PID_TYPE') -> ProcessResult:
40+
"""
41+
Play the process
42+
43+
:param pid: the pid of the process to play
44+
:return: True if played, False otherwise
45+
"""
46+
...
47+
48+
def kill_process(self, pid: 'PID_TYPE', msg: MessageType | None = None) -> ProcessResult:
49+
"""
50+
Kill the process
51+
52+
:param pid: the pid of the process to kill
53+
:param msg: optional kill message
54+
:return: True if killed, False otherwise
55+
"""
56+
...
57+
58+
def continue_process(
59+
self, pid: 'PID_TYPE', tag: str | None = None, nowait: bool = False, no_reply: bool = False
60+
) -> ProcessResult | None:
61+
"""
62+
Continue the process
63+
64+
:param _communicator: the communicator
65+
:param pid: the pid of the process to continue
66+
:param tag: the checkpoint tag to continue from
67+
"""
68+
...
69+
70+
async def launch_process(
71+
self,
72+
process_class: str,
73+
init_args: Sequence[Any] | None = None,
74+
init_kwargs: dict[str, Any] | None = None,
75+
persist: bool = False,
76+
loader: loaders.ObjectLoader | None = None,
77+
nowait: bool = False,
78+
no_reply: bool = False,
79+
) -> ProcessResult:
80+
"""
81+
Launch a process given the class and constructor arguments
82+
83+
:param process_class: the class of the process to launch
84+
:param init_args: the constructor positional arguments
85+
:param init_kwargs: the constructor keyword arguments
86+
:param persist: should the process be persisted
87+
:param loader: the classloader to use
88+
:param nowait: if True, don't wait for the process to send a response, just return the pid
89+
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
90+
:return: the result of launching the process
91+
"""
92+
...
93+
94+
async def execute_process(
95+
self,
96+
process_class: str,
97+
init_args: Sequence[Any] | None = None,
98+
init_kwargs: dict[str, Any] | None = None,
99+
loader: loaders.ObjectLoader | None = None,
100+
nowait: bool = False,
101+
no_reply: bool = False,
102+
) -> ProcessResult:
103+
"""
104+
Execute a process. This call will first send a create task and then a continue task over
105+
the communicator. This means that if communicator messages are durable then the process
106+
will run until the end even if this interpreter instance ceases to exist.
107+
108+
:param process_class: the process class to execute
109+
:param init_args: the positional arguments to the class constructor
110+
:param init_kwargs: the keyword arguments to the class constructor
111+
:param loader: the class loader to use
112+
:param nowait: if True, don't wait for the process to send a response
113+
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
114+
:return: the result of executing the process
115+
"""
116+
...

src/plumpy/coordinator.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import annotations
3+
4+
from typing import TYPE_CHECKING, Any, Callable, Hashable, Pattern, Protocol
5+
6+
if TYPE_CHECKING:
7+
# identifiers for subscribers
8+
ID_TYPE = Hashable
9+
Subscriber = Callable[..., Any]
10+
# RPC subscriber params: communicator, msg
11+
RpcSubscriber = Callable[['Coordinator', Any], Any]
12+
# Task subscriber params: communicator, task
13+
TaskSubscriber = Callable[['Coordinator', Any], Any]
14+
# Broadcast subscribers params: communicator, body, sender, subject, correlation id
15+
BroadcastSubscriber = Callable[['Coordinator', Any, Any, Any, ID_TYPE], Any]
16+
17+
18+
class Coordinator(Protocol):
19+
# XXX: naming - 'add_message_handler'
20+
def add_rpc_subscriber(self, subscriber: 'RpcSubscriber', identifier: 'ID_TYPE | None' = None) -> Any: ...
21+
22+
# XXX: naming - 'add_broadcast_handler'
23+
def add_broadcast_subscriber(
24+
self,
25+
subscriber: 'BroadcastSubscriber',
26+
subject_filters: list[Hashable | Pattern[str]] | None = None,
27+
sender_filters: list[Hashable | Pattern[str]] | None = None,
28+
identifier: 'ID_TYPE | None' = None,
29+
) -> Any: ...
30+
31+
# XXX: naming - absorbed into 'add_message_handler'
32+
def add_task_subscriber(self, subscriber: 'TaskSubscriber', identifier: 'ID_TYPE | None' = None) -> 'ID_TYPE': ...
33+
34+
def remove_rpc_subscriber(self, identifier: 'ID_TYPE | None') -> None: ...
35+
36+
def remove_broadcast_subscriber(self, identifier: 'ID_TYPE | None') -> None: ...
37+
38+
def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None: ...
39+
40+
def rpc_send(self, recipient_id: Hashable, msg: Any) -> Any: ...
41+
42+
def broadcast_send(
43+
self,
44+
body: Any | None,
45+
sender: 'ID_TYPE | None' = None,
46+
subject: str | None = None,
47+
correlation_id: 'ID_TYPE | None' = None,
48+
) -> Any: ...
49+
50+
def task_send(self, task: Any, no_reply: bool = False) -> Any: ...
51+
52+
def close(self) -> None: ...

src/plumpy/exceptions.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
# -*- coding: utf-8 -*-
22
from typing import Optional
33

4-
__all__ = ['ClosedError', 'InvalidStateError', 'KilledError', 'PersistenceError', 'UnsuccessfulResult']
4+
__all__ = [
5+
'ClosedError',
6+
'CoordinatorConnectionError',
7+
'CoordinatorTimeoutError',
8+
'InvalidStateError',
9+
'KilledError',
10+
'PersistenceError',
11+
'UnsuccessfulResult',
12+
]
513

614

715
class KilledError(Exception):
816
"""The process was killed."""
917

1018

1119
class InvalidStateError(Exception):
12-
"""
13-
Raised when an operation is attempted that requires the process to be in a state
20+
"""Raised when an operation is attempted that requires the process to be in a state
1421
that is different from the current state
1522
"""
1623

@@ -33,3 +40,19 @@ class PersistenceError(Exception):
3340

3441
class ClosedError(Exception):
3542
"""Raised when an mutable operation is attempted on a closed process"""
43+
44+
45+
class TaskRejectedError(Exception):
46+
"""A task was rejected by the coordinacor"""
47+
48+
49+
class CoordinatorCommunicationError(Exception):
50+
"""Generic coordinator communication error"""
51+
52+
53+
class CoordinatorConnectionError(ConnectionError):
54+
"""Raised when coordinator cannot be connected"""
55+
56+
57+
class CoordinatorTimeoutError(TimeoutError):
58+
"""Raised when communicate with coordinator timeout"""

src/plumpy/futures.py

Lines changed: 24 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,36 @@
33
Module containing future related methods and classes
44
"""
55

6+
from __future__ import annotations
7+
68
import asyncio
7-
from typing import Any, Awaitable, Callable, Optional
9+
import contextlib
10+
from typing import Any, Awaitable, Callable, Generator, Optional
811

9-
import kiwipy
12+
__all__ = ['CancellableAction', 'Future', 'capture_exceptions', 'create_task', 'create_task']
1013

11-
__all__ = ['CancelledError', 'Future', 'chain', 'copy_future', 'create_task', 'gather']
1214

13-
CancelledError = kiwipy.CancelledError
15+
class InvalidFutureError(Exception):
16+
"""Exception for when a future or action is in an invalid state"""
1417

1518

16-
class InvalidStateError(Exception):
17-
"""Exception for when a future or action is in an invalid state"""
19+
Future = asyncio.Future
1820

1921

20-
copy_future = kiwipy.copy_future
21-
chain = kiwipy.chain
22-
gather = asyncio.gather
22+
@contextlib.contextmanager
23+
def capture_exceptions(future, ignore: tuple[type[BaseException], ...] = ()) -> Generator[None, Any, None]: # type: ignore[no-untyped-def]
24+
"""
25+
Capture any exceptions in the context and set them as the result of the given future
2326
24-
Future = asyncio.Future
27+
:param future: The future to the exception on
28+
:param ignore: An optional list of exception types to ignore, these will be raised and not set on the future
29+
"""
30+
try:
31+
yield
32+
except ignore:
33+
raise
34+
except Exception as exception:
35+
future.set_exception(exception)
2536

2637

2738
class CancellableAction(Future):
@@ -46,10 +57,10 @@ def run(self, *args: Any, **kwargs: Any) -> None:
4657
:param kwargs: the keyword arguments to the action
4758
"""
4859
if self.done():
49-
raise InvalidStateError('Action has already been ran')
60+
raise InvalidFutureError('Action has already been ran')
5061

5162
try:
52-
with kiwipy.capture_exceptions(self):
63+
with capture_exceptions(self):
5364
self.set_result(self._action(*args, **kwargs))
5465
finally:
5566
self._action = None # type: ignore
@@ -67,41 +78,4 @@ def create_task(coro: Callable[[], Awaitable[Any]], loop: Optional[asyncio.Abstr
6778
"""
6879
loop = loop or asyncio.get_event_loop()
6980

70-
future = loop.create_future()
71-
72-
async def run_task() -> None:
73-
with kiwipy.capture_exceptions(future):
74-
res = await coro()
75-
future.set_result(res)
76-
77-
asyncio.run_coroutine_threadsafe(run_task(), loop)
78-
return future
79-
80-
81-
def unwrap_kiwi_future(future: kiwipy.Future) -> kiwipy.Future:
82-
"""
83-
Create a kiwi future that represents the final results of a nested series of futures,
84-
meaning that if the futures provided itself resolves to a future the returned
85-
future will not resolve to a value until the final chain of futures is not a future
86-
but a concrete value. If at any point in the chain a future resolves to an exception
87-
then the returned future will also resolve to that exception.
88-
89-
:param future: the future to unwrap
90-
:return: the unwrapping future
91-
92-
"""
93-
unwrapping = kiwipy.Future()
94-
95-
def unwrap(fut: kiwipy.Future) -> None:
96-
if fut.cancelled():
97-
unwrapping.cancel()
98-
else:
99-
with kiwipy.capture_exceptions(unwrapping):
100-
result = fut.result()
101-
if isinstance(result, kiwipy.Future):
102-
result.add_done_callback(unwrap)
103-
else:
104-
unwrapping.set_result(result)
105-
106-
future.add_done_callback(unwrap)
107-
return unwrapping
81+
return asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coro(), loop))

0 commit comments

Comments
 (0)