diff --git a/src/aiida/brokers/rabbitmq/coordinator.py b/src/aiida/brokers/rabbitmq/coordinator.py index 6c6a13c7e2..58ce21ecee 100644 --- a/src/aiida/brokers/rabbitmq/coordinator.py +++ b/src/aiida/brokers/rabbitmq/coordinator.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from asyncio import AbstractEventLoop +import asyncio from typing import Generic, TypeVar, final import kiwipy import concurrent.futures @@ -84,3 +85,7 @@ def task_send(self, task, no_reply=False): def close(self): self._comm.close() + + def is_closed(self) -> bool: + """Return `True` if the communicator was closed""" + return self._comm.is_closed() diff --git a/src/aiida/cmdline/utils/decorators.py b/src/aiida/cmdline/utils/decorators.py index 595ce8373d..1cd251493e 100644 --- a/src/aiida/cmdline/utils/decorators.py +++ b/src/aiida/cmdline/utils/decorators.py @@ -326,8 +326,7 @@ def start_daemon(): assert profile is not None - loop = asyncio.get_event_loop() - if manager.create_broker(loop) is None: + if manager.get_broker() is None: echo.echo_critical( f'profile `{profile.name}` does not define a broker and so cannot use this functionality.' f'See {URL_NO_BROKER} for more details.' diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index cb14be2b85..007ae3770e 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -92,9 +92,13 @@ def __init__( self._plugin_version_provider = PluginVersionProvider() # FIXME: broker and coordinator overlap the concept there for over-abstraction, remove the abstraction + # Broker should always create inside runner? since they should share the loop. if broker is not None: self._coordinator = broker.get_coordinator() self._controller = broker.get_controller() + + # FIXME: why with wrapper, the pending task not exist?? + # self._coordinator = wrap_communicator(broker.get_coordinator().communicator, self._loop) elif self._broker_submit: # FIXME: if broker then broker_submit else False LOGGER.warning('Disabling broker submission, no coordinator provided') diff --git a/src/aiida/manage/manager.py b/src/aiida/manage/manager.py index e9bf5c17f0..ca46d0894f 100644 --- a/src/aiida/manage/manager.py +++ b/src/aiida/manage/manager.py @@ -286,9 +286,13 @@ def get_profile_storage(self) -> 'StorageBackend': return self._profile_storage def get_broker(self) -> 'Broker | None': - return self._broker + if self._broker is not None: + return self._broker + + _default_loop = asyncio.get_event_loop() + return self._create_broker(_default_loop) - def create_broker(self, loop) -> 'Broker | None': + def _create_broker(self, loop) -> 'Broker | None': """Return an instance of :class:`aiida.brokers.broker.Broker` if the profile defines a broker. :returns: The broker of the profile, or ``None`` if the profile doesn't define one. @@ -427,7 +431,7 @@ def create_runner( _default_loop = asyncio.get_event_loop() loop = loop or _default_loop - _default_broker = self.create_broker(loop) + _default_broker = self._create_broker(loop) runner = runners.Runner( poll_interval=poll_interval or _default_poll_interval,