Skip to content

Commit 329c51c

Browse files
committed
Keep on mess up with coordinator loop
When calling add_rpc_subscriber and add_task_subscriber, the event loop of caller may from random event loop. But the target event loop is the runner one. Therefore it requires to pass the loop to the broker when creating the runner and runner's broker.
1 parent 02a939e commit 329c51c

File tree

4 files changed

+17
-5
lines changed

4 files changed

+17
-5
lines changed

src/aiida/brokers/rabbitmq/coordinator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
from asyncio import AbstractEventLoop
3+
import asyncio
34
from typing import Generic, TypeVar, final
45
import kiwipy
56
import concurrent.futures
@@ -84,3 +85,7 @@ def task_send(self, task, no_reply=False):
8485

8586
def close(self):
8687
self._comm.close()
88+
89+
def is_closed(self) -> bool:
90+
"""Return `True` if the communicator was closed"""
91+
return self._comm.is_closed()

src/aiida/cmdline/utils/decorators.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,7 @@ def start_daemon():
326326

327327
assert profile is not None
328328

329-
loop = asyncio.get_event_loop()
330-
if manager.create_broker(loop) is None:
329+
if manager.get_broker() is None:
331330
echo.echo_critical(
332331
f'profile `{profile.name}` does not define a broker and so cannot use this functionality.'
333332
f'See {URL_NO_BROKER} for more details.'

src/aiida/engine/runners.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,13 @@ def __init__(
9292
self._plugin_version_provider = PluginVersionProvider()
9393

9494
# FIXME: broker and coordinator overlap the concept there for over-abstraction, remove the abstraction
95+
# Broker should always create inside runner? since they should share the loop.
9596
if broker is not None:
9697
self._coordinator = broker.get_coordinator()
9798
self._controller = broker.get_controller()
99+
100+
# FIXME: why with wrapper, the pending task not exist??
101+
# self._coordinator = wrap_communicator(broker.get_coordinator().communicator, self._loop)
98102
elif self._broker_submit:
99103
# FIXME: if broker then broker_submit else False
100104
LOGGER.warning('Disabling broker submission, no coordinator provided')

src/aiida/manage/manager.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,13 @@ def get_profile_storage(self) -> 'StorageBackend':
286286
return self._profile_storage
287287

288288
def get_broker(self) -> 'Broker | None':
289-
return self._broker
289+
if self._broker is not None:
290+
return self._broker
291+
292+
_default_loop = asyncio.get_event_loop()
293+
return self._create_broker(_default_loop)
290294

291-
def create_broker(self, loop) -> 'Broker | None':
295+
def _create_broker(self, loop) -> 'Broker | None':
292296
"""Return an instance of :class:`aiida.brokers.broker.Broker` if the profile defines a broker.
293297
294298
:returns: The broker of the profile, or ``None`` if the profile doesn't define one.
@@ -427,7 +431,7 @@ def create_runner(
427431
_default_loop = asyncio.get_event_loop()
428432

429433
loop = loop or _default_loop
430-
_default_broker = self.create_broker(loop)
434+
_default_broker = self._create_broker(loop)
431435

432436
runner = runners.Runner(
433437
poll_interval=poll_interval or _default_poll_interval,

0 commit comments

Comments
 (0)