Skip to content

Commit c769906

Browse files
committed
Controller snuck into broker
1 parent 5d59e6a commit c769906

File tree

6 files changed

+36
-21
lines changed

6 files changed

+36
-21
lines changed

environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ dependencies:
2222
- importlib-metadata~=6.0
2323
- numpy~=1.21
2424
- paramiko~=3.0
25-
- plumpy~=0.22.3
25+
- plumpy
2626
- pgsu~=0.3.0
2727
- psutil~=5.6
2828
- psycopg[binary]~=3.0

src/aiida/brokers/broker.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import abc
44
import typing as t
55

6+
from plumpy.controller import ProcessController
67

78
if t.TYPE_CHECKING:
8-
from aiida.manage.configuration.profile import Profile
99
from plumpy.coordinator import Coordinator
1010

11+
from aiida.manage.configuration.profile import Profile
12+
1113
__all__ = ('Broker',)
1214

1315

@@ -25,6 +27,11 @@ def __init__(self, profile: 'Profile') -> None:
2527
def get_coordinator(self) -> 'Coordinator':
2628
"""Return an instance of coordinator."""
2729

30+
@abc.abstractmethod
31+
def get_controller(self) -> ProcessController:
32+
"""Return the process controller"""
33+
...
34+
2835
@abc.abstractmethod
2936
def iterate_tasks(self):
3037
"""Return an iterator over the tasks in the launch queue."""

src/aiida/brokers/rabbitmq/broker.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import functools
66
import typing as t
77

8-
from plumpy.rmq import RmqCoordinator
8+
from plumpy.rmq import RemoteProcessThreadController, RmqCoordinator
9+
from plumpy import ProcessController
10+
from plumpy.rmq.process_control import RemoteProcessController
911

1012
from aiida.brokers.broker import Broker
1113
from aiida.common.log import AIIDA_LOGGER
@@ -15,6 +17,7 @@
1517

1618
if t.TYPE_CHECKING:
1719
from kiwipy.rmq import RmqThreadCommunicator
20+
1821
from aiida.manage.configuration.profile import Profile
1922

2023
LOGGER = AIIDA_LOGGER.getChild('broker.rabbitmq')
@@ -61,6 +64,10 @@ def get_coordinator(self):
6164

6265
return coordinator
6366

67+
def get_controller(self) -> ProcessController:
68+
coordinator = self.get_coordinator()
69+
return RemoteProcessThreadController(coordinator)
70+
6471
def _create_communicator(self) -> 'RmqThreadCommunicator':
6572
"""Return an instance of :class:`kiwipy.Communicator`."""
6673
from kiwipy.rmq import RmqThreadCommunicator

src/aiida/engine/processes/process.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
# from kiwipy.communications import UnroutableError
4444
# from plumpy.processes import ConnectionClosed # type: ignore[attr-defined]
4545
from plumpy.process_states import Finished, ProcessState
46-
4746
from plumpy.processes import Process as PlumpyProcess
4847
from plumpy.utils import AttributesFrozendict
4948

src/aiida/engine/runners.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from aiida.common import exceptions
2828
from aiida.orm import ProcessNode, load_node
2929
from aiida.plugins.utils import PluginVersionProvider
30+
from aiida.brokers import Broker
3031

3132
from . import transports, utils
3233
from .processes import Process, ProcessBuilder, ProcessState, futures
@@ -64,22 +65,22 @@ def __init__(
6465
self,
6566
poll_interval: Union[int, float] = 0,
6667
loop: Optional[asyncio.AbstractEventLoop] = None,
67-
coordinator: Optional[Coordinator] = None,
68+
broker: Broker | None = None,
6869
broker_submit: bool = False,
6970
persister: Optional[Persister] = None,
7071
):
7172
"""Construct a new runner.
7273
7374
:param poll_interval: interval in seconds between polling for status of active sub processes
7475
:param loop: an asyncio event loop, if none is suppled a new one will be created
75-
:param coordinator: the coordinator to use
76+
:param broker: the broker to use
7677
:param broker_submit: if True, processes will be submitted to the broker, otherwise they will be scheduled here
7778
:param persister: the persister to use to persist processes
7879
7980
"""
8081
assert not (
8182
broker_submit and persister is None
82-
), 'Must supply a persister if you want to submit using coordinator'
83+
), 'Must supply a persister if you want to submit using coordinator/broker'
8384

8485
set_event_loop_policy()
8586
self._loop = loop or asyncio.get_event_loop()
@@ -90,11 +91,14 @@ def __init__(
9091
self._persister = persister
9192
self._plugin_version_provider = PluginVersionProvider()
9293

93-
if coordinator is not None:
94-
# FIXME: the wrap is not needed, when passed in, the coordinator should already wrapped
95-
self._coordinator = wrap_communicator(coordinator.communicator, self._loop)
96-
self._controller = RemoteProcessThreadController(coordinator)
94+
# FIXME: broker and coordinator overlap the concept there for over-abstraction, remove the abstraction
95+
if broker is not None:
96+
_coordinator = broker.get_coordinator()
97+
# FIXME: the wrap should not be needed
98+
self._coordinator = wrap_communicator(_coordinator.communicator, self._loop)
99+
self._controller = broker.get_controller()
97100
elif self._broker_submit:
101+
# FIXME: if broker then broker_submit else False
98102
LOGGER.warning('Disabling broker submission, no coordinator provided')
99103
self._broker_submit = False
100104

@@ -350,7 +354,7 @@ def get_process_future(self, pk: int) -> futures.ProcessFuture:
350354
351355
:return: A future representing the completion of the process node
352356
"""
353-
return futures.ProcessFuture(pk, self._loop, self._poll_interval, self._coordinator)
357+
return futures.ProcessFuture(pk, self._loop, self._poll_interval, self.coordinator)
354358

355359
def _poll_process(self, node, callback):
356360
"""Check whether the process state of the node is terminated and call the callback or reschedule it.

src/aiida/manage/manager.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,12 @@
1010

1111
from __future__ import annotations
1212

13+
import asyncio
1314
from typing import TYPE_CHECKING, Any, Optional, Union
1415

15-
import asyncio
16-
import kiwipy
1716
from plumpy.coordinator import Coordinator
1817

1918
if TYPE_CHECKING:
20-
from kiwipy.rmq import RmqThreadCommunicator
2119
from plumpy.process_comms import RemoteProcessThreadController
2220

2321
from aiida.brokers.broker import Broker
@@ -169,7 +167,7 @@ def reset_profile_storage(self) -> None:
169167
self._profile_storage = None
170168

171169
def reset_broker(self) -> None:
172-
"""Reset the communicator."""
170+
"""Reset the broker."""
173171
from concurrent import futures
174172

175173
if self._broker is not None:
@@ -401,7 +399,7 @@ def create_runner(
401399
self,
402400
poll_interval: Union[int, float] | None = None,
403401
loop: Optional[asyncio.AbstractEventLoop] = None,
404-
coordinator: Optional[Coordinator] = None,
402+
broker: Broker | None = None,
405403
broker_submit: bool = False,
406404
persister: Optional[AiiDAPersister] = None,
407405
) -> 'Runner':
@@ -422,19 +420,19 @@ def create_runner(
422420

423421
_default_poll_interval = 0.0 if profile.is_test_profile else self.get_option('runner.poll.interval')
424422
_default_broker_submit = False
425-
_default_coordinator = self.get_coordinator()
426423
_default_persister = self.get_persister()
424+
_default_broker = self.get_broker()
427425

428426
runner = runners.Runner(
429427
poll_interval=poll_interval or _default_poll_interval,
430428
loop=loop or asyncio.get_event_loop(),
431-
coordinator=coordinator or _default_coordinator,
429+
broker=broker or _default_broker,
432430
broker_submit=broker_submit or _default_broker_submit,
433431
persister=persister or _default_persister,
434432
)
435433
return runner
436434

437-
def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = None) -> 'Runner':
435+
def create_daemon_runner(self) -> 'Runner':
438436
"""Create and return a new daemon runner.
439437
440438
This is used by workers when the daemon is running and in testing.
@@ -449,7 +447,7 @@ def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = Non
449447
from aiida.engine import persistence
450448
from aiida.engine.processes.launcher import ProcessLauncher
451449

452-
runner = self.create_runner(broker_submit=True, loop=loop)
450+
runner = self.create_runner(broker_submit=True, loop=None)
453451
runner_loop = runner.loop
454452

455453
# Listen for incoming launch requests

0 commit comments

Comments
 (0)