-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple broker and coordinator interface #6675
base: main
Are you sure you want to change the base?
Conversation
…lledError not needed
cff69d3
to
b1f446a
Compare
afc0925
to
28cdb1c
Compare
e613a3b
to
5d59e6a
Compare
c6c6352
to
c769906
Compare
abaf3e9
to
03f7a5b
Compare
ca1dd09
to
02a939e
Compare
fe92318
to
329c51c
Compare
When calling add_rpc_subscriber and add_task_subscriber, the event loop of caller may from random event loop. But the target event loop is the runner one. Therefore it requires to pass the loop to the broker when creating the runner and runner's broker.
for more information, see https://pre-commit.ci
Instead of as a method of runner which is not needed and confuse.
9583218
to
69db549
Compare
a08471c
to
5746ae8
Compare
5572660
to
d62816e
Compare
ccf4af2
to
d62816e
Compare
bf35126
to
f2ec982
Compare
The refactoring is targeting to decouple the dependencies of using kiwipy+rmq as the communicator for the process control. By forming a `Coordinator` protocol contract, the different type of rmq/kiwipy related codes are removed out from plumpy logic. The new contract also pave the way to make it clearly show how a new type coordinator can be implemented (future examples will be the `tatzelwurm` a task broker that has scheduler support and file based task broker require no background service). For the prototype of how a coordinator should look like, the `MockCoordinator` in `tests/utils` is the coordinator that store things in memory, and can serve as the lightweight ephemeral daemon without persistent functionality. Another major change here is hand write the resolver of future by mimic how tho asyncio does for wrapping `concurrent.futures.Future` into `asyncio.Future`. I use the same way to convert `asyncio.Future` into `concurent.futures.Future` (which is the `kiwipy.Future` as alias). - move the `aio_pika` import lazily by moving the rmq exceptions to `rmq` module, this can increase the performance of `import aiida; aiida.orm`. - ~~`CancellableAction` using composite to behave as a Future like object.~~ - use `asyncio.Future` in favor of alias `plumpy.Future` and - use `concurrent.futures.Future` instead of alias `kiwipy.Future`. - Hand write `_chain` and `_copy_future` since we can not just rely on the API of asyncio that is not exposed. - Forming the `coordinator/Communicator` protocol. - Just forming the `coordinator/Coordinator` protocol and wrap rmq/communicator as a coordinator that not require changs in kiwipy. - Mock the coordinator for unit test. - test against aiida-core see what need to be changed there and improve here. (aiidateam/aiida-core#6675) - The API for plumpy process can be more compact instead of using kiwipy/rmq "subscriber" concept. (how to replace rpc pattern??)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round of comments.
Note for me
Runner -> Broker -> Communicator
-> Coordinator -> Communicator
-> ProcessController
"""The inner communicator.""" | ||
return self._comm | ||
|
||
def add_rpc_subscriber(self, subscriber, identifier=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should expose types in plumpy to external libraries (see code in plumpy) to use them for type hints here
@@ -37,6 +38,39 @@ async def shutdown_worker(runner: Runner) -> None: | |||
LOGGER.info('Daemon worker stopped') | |||
|
|||
|
|||
def create_daemon_runner(manager: Manager) -> 'Runner': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for me: This has been moved from src/aiida/manage/manager.py
broker: Broker | None = None, | ||
broker_submit: bool = False, | ||
persister: Optional[AiiDAPersister] = None, | ||
) -> 'Runner': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for me: keyword arguments have been made explicit
_default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval') | ||
_default_broker_submit = False | ||
_default_persister = self.get_persister() | ||
_default_loop = asyncio.get_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this are marked for internal use only? Do you want to make them part of the class? Right now they are just defined in the function
broker_submit: bool = False, | ||
persister: Optional[AiiDAPersister] = None, | ||
) -> 'Runner': | ||
"""Create and return a new runner, with default settings from profile. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for me: this is making the keyword arguments explicit. I think the logic is more or less the same
runner.start() | ||
# runner_thread.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around and this seems to work
# copy of runner.close adapted
assert not runner._closed
runner._loop.call_soon_threadsafe(runner._loop.stop)
thread.join()
if not runner._loop.is_running():
runner._loop.close()
reset_event_loop_policy()
runner._closed = True
at least for this toy example
import asyncio
import threading
import time
def start_event_loop(loop):
"""Run an asyncio event loop in a separate thread."""
asyncio.set_event_loop(loop)
loop.run_forever()
# Create a new asyncio event loop
loop = asyncio.new_event_loop()
# Start the loop in a new thread
thread = threading.Thread(target=start_event_loop, args=(loop,), daemon=True)
thread.start()
# Function to run async tasks from the main thread
def run_async_task(coro):
"""Submit a coroutine to the event loop running in another thread."""
return asyncio.run_coroutine_threadsafe(coro, loop)
# Example async function
async def say_hello():
print(f"Hello from thread: {threading.current_thread().name}")
await asyncio.sleep(2)
print(f"Goodbye from thread: {threading.current_thread().name}")
# Submit coroutine to the running event loop
future = run_async_task(say_hello())
# Wait for the result
future.result()
# Give time to process (optional)
time.sleep(1)
# Stop the event loop and join the thread (if needed)
loop.call_soon_threadsafe(loop.stop)
thread.join()
loop.close()
If I don't use call_soon_threadsafe
it hangs. Also I cannot close the loop before stopping it.
If this works there still remains the question how to integrate this into the runner as the runner needs to be aware about the thread it is running on. One cannot close the loop before joining the thread
_controller: Optional[RemoteProcessThreadController] = None | ||
_closed: bool = False | ||
|
||
def __init__( | ||
self, | ||
poll_interval: Union[int, float] = 0, | ||
loop: Optional[asyncio.AbstractEventLoop] = None, | ||
communicator: Optional[kiwipy.Communicator] = None, | ||
broker: Broker | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for me: the runner does not know the communicator/coordinatior anymore but the broker. The broker knows the coordinator.
|
||
set_event_loop_policy() | ||
self._loop = loop if loop is not None else asyncio.get_event_loop() | ||
self._loop = loop or asyncio.get_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually like is None
checks in general more since they are explicit. This also turns returns also to the default case if wrong types are put in loop = [] or [5] # -> loop will be [5]
. I don't think it matters in this case but I wanted to mention this.
|
||
__all__ = ['RmqCoordinator'] | ||
|
||
U = TypeVar('U', bound=kiwipy.Communicator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we give it a more meaningful name?
U = TypeVar('U', bound=kiwipy.Communicator) | |
CommunicatorImpl = TypeVar('CommunicatorImpl', bound=kiwipy.Communicator) |
"""Construct a new instance. | ||
|
||
:param profile: The profile. | ||
""" | ||
self._profile = profile | ||
self._communicator: 'RmqThreadCommunicator' | None = None | ||
self._communicator: 'RmqThreadCommunicator | None' = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we keep two communicator
Broker -> Communicator
-> Coordinator -> Communicator
It could happen easily in the code that two communicators get out of sync (so reference to a different one). If we expose the Communicator in the Coordinator we could simplify it to
Broker -> Coordinator -> Communicator
and avoid this issue of two communicators. I am not sure how easy it is to replace the communicator in the coordinator, since no one is managing the old communicator anymore. So we would need to close the existing coordinator and raise an error if it cannot be closed.
runner with dedicated thread(need more experiments)broker and coordinator are duplicate abstraction, remove brokerthis should be done after, in this change, it bring coordinator layer first to not break the broker abstraction introduced (which is not real abstraction and relatively useless).