Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple broker and coordinator interface #6675

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from

Conversation

unkcpz
Copy link
Member

@unkcpz unkcpz commented Dec 20, 2024

  • all FIXME notes resolved
  • MockCoordinator -> InMemoryCoordinator and an exmaple of submit without rmq.
  • runner with dedicated thread (need more experiments)
  • Which change in plumpy/rmq-out resolve the test_disconnect timeout?
  • BroadcastFilter absorbed into add_broadcast_subscriber API.
  • broker and coordinator are duplicate abstraction, remove broker this should be done after, in this change, it bring coordinator layer first to not break the broker abstraction introduced (which is not real abstraction and relatively useless).
  • Move all kiwipy/rmq modules into broker/rabbitmq and make aiida-core not directly (but through broker/coordinator interface) dep on it.

@unkcpz unkcpz requested a review from agoscinski as a code owner December 20, 2024 15:46
@unkcpz unkcpz marked this pull request as draft December 20, 2024 15:46
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch 3 times, most recently from cff69d3 to b1f446a Compare December 20, 2024 17:42
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from afc0925 to 28cdb1c Compare December 21, 2024 02:08
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch 3 times, most recently from e613a3b to 5d59e6a Compare December 21, 2024 11:09
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from c6c6352 to c769906 Compare December 21, 2024 13:52
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from abaf3e9 to 03f7a5b Compare December 27, 2024 00:36
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from ca1dd09 to 02a939e Compare December 27, 2024 01:39
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from fe92318 to 329c51c Compare December 27, 2024 13:29
unkcpz and others added 3 commits December 27, 2024 14:30
When calling add_rpc_subscriber and add_task_subscriber, the event loop
of caller may from random event loop. But the target event loop is the
runner one. Therefore it requires to pass the loop to the broker when
creating the runner and runner's broker.
Instead of as a method of runner which is not needed and confuse.
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from 9583218 to 69db549 Compare December 28, 2024 00:35
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from a08471c to 5746ae8 Compare December 29, 2024 00:44
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from 5572660 to d62816e Compare December 29, 2024 22:59
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from ccf4af2 to d62816e Compare December 30, 2024 00:15
@unkcpz unkcpz force-pushed the rmq-out-small-step-try branch from bf35126 to f2ec982 Compare January 10, 2025 20:28
unkcpz added a commit to unkcpz/plumpy that referenced this pull request Jan 17, 2025
The refactoring is targeting to decouple the dependencies of using kiwipy+rmq as the communicator for the process control. 
By forming a `Coordinator` protocol contract, the different type of rmq/kiwipy related codes are removed out from plumpy logic. The new contract also pave the way to make it clearly show how a new type coordinator can be implemented (future examples will be the `tatzelwurm` a task broker that has scheduler support and file based task broker require no background service). 
For the prototype of how a coordinator should look like, the `MockCoordinator` in `tests/utils` is the coordinator that store things in memory, and can serve as the lightweight ephemeral daemon without persistent functionality.

Another major change here is hand write the resolver of future by mimic how tho asyncio does for wrapping `concurrent.futures.Future` into `asyncio.Future`. I use the same way to convert `asyncio.Future` into `concurent.futures.Future` (which is the `kiwipy.Future` as alias). 

- move the `aio_pika` import lazily by moving the rmq exceptions to `rmq` module, this can increase the performance of `import aiida; aiida.orm`.
- ~~`CancellableAction` using composite to behave as a Future like object.~~
- use `asyncio.Future` in favor of alias `plumpy.Future` and 
- use `concurrent.futures.Future` instead of alias `kiwipy.Future`.
- Hand write `_chain` and `_copy_future` since we can not just rely on the API of asyncio that is not exposed.
- Forming the `coordinator/Communicator` protocol.
- Just forming the `coordinator/Coordinator` protocol and wrap rmq/communicator as a coordinator that not require changs in kiwipy.
- Mock the coordinator for unit test.
- test against aiida-core see what need to be changed there and improve here. (aiidateam/aiida-core#6675)
- The API for plumpy process can be more compact instead of using kiwipy/rmq "subscriber" concept. (how to replace rpc pattern??)
Copy link
Contributor

@agoscinski agoscinski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round of comments.

Note for me

Runner -> Broker -> Communicator
                 -> Coordinator -> Communicator
                 -> ProcessController

"""The inner communicator."""
return self._comm

def add_rpc_subscriber(self, subscriber, identifier=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should expose types in plumpy to external libraries (see code in plumpy) to use them for type hints here

@@ -37,6 +38,39 @@ async def shutdown_worker(runner: Runner) -> None:
LOGGER.info('Daemon worker stopped')


def create_daemon_runner(manager: Manager) -> 'Runner':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for me: This has been moved from src/aiida/manage/manager.py

broker: Broker | None = None,
broker_submit: bool = False,
persister: Optional[AiiDAPersister] = None,
) -> 'Runner':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for me: keyword arguments have been made explicit

_default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval')
_default_broker_submit = False
_default_persister = self.get_persister()
_default_loop = asyncio.get_event_loop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this are marked for internal use only? Do you want to make them part of the class? Right now they are just defined in the function

broker_submit: bool = False,
persister: Optional[AiiDAPersister] = None,
) -> 'Runner':
"""Create and return a new runner, with default settings from profile.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for me: this is making the keyword arguments explicit. I think the logic is more or less the same

runner.start()
# runner_thread.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around and this seems to work

# copy of runner.close adapted
assert not runner._closed
runner._loop.call_soon_threadsafe(runner._loop.stop)
thread.join()
if not runner._loop.is_running():
    runner._loop.close()
reset_event_loop_policy()
runner._closed = True

at least for this toy example

import asyncio
import threading
import time

def start_event_loop(loop):
    """Run an asyncio event loop in a separate thread."""
    asyncio.set_event_loop(loop)
    loop.run_forever()

# Create a new asyncio event loop
loop = asyncio.new_event_loop()

# Start the loop in a new thread
thread = threading.Thread(target=start_event_loop, args=(loop,), daemon=True)
thread.start()

# Function to run async tasks from the main thread
def run_async_task(coro):
    """Submit a coroutine to the event loop running in another thread."""
    return asyncio.run_coroutine_threadsafe(coro, loop)

# Example async function
async def say_hello():
    print(f"Hello from thread: {threading.current_thread().name}")
    await asyncio.sleep(2)
    print(f"Goodbye from thread: {threading.current_thread().name}")

# Submit coroutine to the running event loop
future = run_async_task(say_hello())

# Wait for the result
future.result()

# Give time to process (optional)
time.sleep(1)

# Stop the event loop and join the thread (if needed)
loop.call_soon_threadsafe(loop.stop)
thread.join()
loop.close()

If I don't use call_soon_threadsafe it hangs. Also I cannot close the loop before stopping it.

If this works there still remains the question how to integrate this into the runner as the runner needs to be aware about the thread it is running on. One cannot close the loop before joining the thread

_controller: Optional[RemoteProcessThreadController] = None
_closed: bool = False

def __init__(
self,
poll_interval: Union[int, float] = 0,
loop: Optional[asyncio.AbstractEventLoop] = None,
communicator: Optional[kiwipy.Communicator] = None,
broker: Broker | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for me: the runner does not know the communicator/coordinatior anymore but the broker. The broker knows the coordinator.


set_event_loop_policy()
self._loop = loop if loop is not None else asyncio.get_event_loop()
self._loop = loop or asyncio.get_event_loop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like is None checks in general more since they are explicit. This also turns returns also to the default case if wrong types are put in loop = [] or [5] # -> loop will be [5]. I don't think it matters in this case but I wanted to mention this.


__all__ = ['RmqCoordinator']

U = TypeVar('U', bound=kiwipy.Communicator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we give it a more meaningful name?

Suggested change
U = TypeVar('U', bound=kiwipy.Communicator)
CommunicatorImpl = TypeVar('CommunicatorImpl', bound=kiwipy.Communicator)

"""Construct a new instance.

:param profile: The profile.
"""
self._profile = profile
self._communicator: 'RmqThreadCommunicator' | None = None
self._communicator: 'RmqThreadCommunicator | None' = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we keep two communicator

Broker -> Communicator
       -> Coordinator -> Communicator

It could happen easily in the code that two communicators get out of sync (so reference to a different one). If we expose the Communicator in the Coordinator we could simplify it to

Broker -> Coordinator -> Communicator

and avoid this issue of two communicators. I am not sure how easy it is to replace the communicator in the coordinator, since no one is managing the old communicator anymore. So we would need to close the existing coordinator and raise an error if it cannot be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants