From 329c51cd1b2e282c43735152234102ad4e4a7f15 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Fri, 27 Dec 2024 03:36:59 +0100 Subject: [PATCH] Keep on mess up with coordinator loop When calling add_rpc_subscriber and add_task_subscriber, the event loop of caller may from random event loop. But the target event loop is the runner one. Therefore it requires to pass the loop to the broker when creating the runner and runner's broker. --- src/aiida/brokers/rabbitmq/coordinator.py | 5 +++++ src/aiida/cmdline/utils/decorators.py | 3 +-- src/aiida/engine/runners.py | 4 ++++ src/aiida/manage/manager.py | 10 +++++++--- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/aiida/brokers/rabbitmq/coordinator.py b/src/aiida/brokers/rabbitmq/coordinator.py index 6c6a13c7e2..58ce21ecee 100644 --- a/src/aiida/brokers/rabbitmq/coordinator.py +++ b/src/aiida/brokers/rabbitmq/coordinator.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from asyncio import AbstractEventLoop +import asyncio from typing import Generic, TypeVar, final import kiwipy import concurrent.futures @@ -84,3 +85,7 @@ def task_send(self, task, no_reply=False): def close(self): self._comm.close() + + def is_closed(self) -> bool: + """Return `True` if the communicator was closed""" + return self._comm.is_closed() diff --git a/src/aiida/cmdline/utils/decorators.py b/src/aiida/cmdline/utils/decorators.py index 595ce8373d..1cd251493e 100644 --- a/src/aiida/cmdline/utils/decorators.py +++ b/src/aiida/cmdline/utils/decorators.py @@ -326,8 +326,7 @@ def start_daemon(): assert profile is not None - loop = asyncio.get_event_loop() - if manager.create_broker(loop) is None: + if manager.get_broker() is None: echo.echo_critical( f'profile `{profile.name}` does not define a broker and so cannot use this functionality.' f'See {URL_NO_BROKER} for more details.' diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index cb14be2b85..007ae3770e 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -92,9 +92,13 @@ def __init__( self._plugin_version_provider = PluginVersionProvider() # FIXME: broker and coordinator overlap the concept there for over-abstraction, remove the abstraction + # Broker should always create inside runner? since they should share the loop. if broker is not None: self._coordinator = broker.get_coordinator() self._controller = broker.get_controller() + + # FIXME: why with wrapper, the pending task not exist?? + # self._coordinator = wrap_communicator(broker.get_coordinator().communicator, self._loop) elif self._broker_submit: # FIXME: if broker then broker_submit else False LOGGER.warning('Disabling broker submission, no coordinator provided') diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index e9bf5c17f0..ca46d0894f 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -286,9 +286,13 @@ def get_profile_storage(self) -> 'StorageBackend': return self._profile_storage def get_broker(self) -> 'Broker | None': - return self._broker + if self._broker is not None: + return self._broker + + _default_loop = asyncio.get_event_loop() + return self._create_broker(_default_loop) - def create_broker(self, loop) -> 'Broker | None': + def _create_broker(self, loop) -> 'Broker | None': """Return an instance of :class:`aiida.brokers.broker.Broker` if the profile defines a broker. :returns: The broker of the profile, or ``None`` if the profile doesn't define one. @@ -427,7 +431,7 @@ def create_runner( _default_loop = asyncio.get_event_loop() loop = loop or _default_loop - _default_broker = self.create_broker(loop) + _default_broker = self._create_broker(loop) runner = runners.Runner( poll_interval=poll_interval or _default_poll_interval,