Skip to content

Commit 5d59e6a

Browse files
committed
Remove get_communicator calls
1 parent 28cdb1c commit 5d59e6a

File tree

11 files changed

+48
-62
lines changed

11 files changed

+48
-62
lines changed

src/aiida/brokers/broker.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import abc
44
import typing as t
55

6+
67
if t.TYPE_CHECKING:
78
from aiida.manage.configuration.profile import Profile
9+
from plumpy.coordinator import Coordinator
810

911
__all__ = ('Broker',)
1012

@@ -20,11 +22,7 @@ def __init__(self, profile: 'Profile') -> None:
2022
self._profile = profile
2123

2224
@abc.abstractmethod
23-
def get_communicator(self):
24-
"""Return an instance of :class:`kiwipy.Communicator`."""
25-
26-
@abc.abstractmethod
27-
def get_coordinator(self):
25+
def get_coordinator(self) -> 'Coordinator':
2826
"""Return an instance of coordinator."""
2927

3028
@abc.abstractmethod

src/aiida/brokers/rabbitmq/broker.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def __init__(self, profile: Profile) -> None:
3131
:param profile: The profile.
3232
"""
3333
self._profile = profile
34-
self._communicator: 'RmqThreadCommunicator' | None = None
34+
self._communicator: 'RmqThreadCommunicator | None' = None
3535
self._prefix = f'aiida-{self._profile.uuid}'
3636

3737
def __str__(self):
@@ -48,19 +48,16 @@ def close(self):
4848

4949
def iterate_tasks(self):
5050
"""Return an iterator over the tasks in the launch queue."""
51-
for task in self.get_communicator().task_queue(get_launch_queue_name(self._prefix)):
51+
for task in self.get_coordinator().communicator.task_queue(get_launch_queue_name(self._prefix)):
5252
yield task
5353

54-
def get_communicator(self) -> 'RmqThreadCommunicator':
54+
def get_coordinator(self):
5555
if self._communicator is None:
5656
self._communicator = self._create_communicator()
5757
# Check whether a compatible version of RabbitMQ is being used.
5858
self.check_rabbitmq_version()
5959

60-
return self._communicator
61-
62-
def get_coordinator(self):
63-
coordinator = RmqCoordinator(self.get_communicator())
60+
coordinator = RmqCoordinator(self._communicator)
6461

6562
return coordinator
6663

@@ -70,7 +67,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator':
7067

7168
from aiida.orm.utils import serialize
7269

73-
self._communicator = RmqThreadCommunicator.connect(
70+
_communicator = RmqThreadCommunicator.connect(
7471
connection_params={'url': self.get_url()},
7572
message_exchange=get_message_exchange_name(self._prefix),
7673
encoder=functools.partial(serialize.serialize, encoding='utf-8'),
@@ -84,7 +81,7 @@ def _create_communicator(self) -> 'RmqThreadCommunicator':
8481
testing_mode=self._profile.is_test_profile,
8582
)
8683

87-
return self._communicator
84+
return _communicator
8885

8986
def check_rabbitmq_version(self):
9087
"""Check the version of RabbitMQ that is being connected to and emit warning if it is not compatible."""
@@ -128,4 +125,4 @@ def get_rabbitmq_version(self):
128125
"""
129126
from packaging.version import parse
130127

131-
return parse(self.get_communicator().server_properties['version'])
128+
return parse(self.get_coordinator().communicator.server_properties['version'])

src/aiida/cmdline/commands/cmd_process.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import click
1212

13+
from aiida.brokers.broker import Broker
1314
from aiida.cmdline.commands.cmd_verdi import verdi
1415
from aiida.cmdline.params import arguments, options, types
1516
from aiida.cmdline.utils import decorators, echo
@@ -416,7 +417,7 @@ def process_play(processes, all_entries, timeout, wait):
416417
@decorators.with_dbenv()
417418
@decorators.with_broker
418419
@decorators.only_if_daemon_running(echo.echo_warning, 'daemon is not running, so process may not be reachable')
419-
def process_watch(broker, processes, most_recent_node):
420+
def process_watch(broker: Broker, processes, most_recent_node):
420421
"""Watch the state transitions of processes.
421422
422423
Watch the state transitions for one or multiple running processes."""
@@ -436,7 +437,7 @@ def process_watch(broker, processes, most_recent_node):
436437

437438
from kiwipy import BroadcastFilter
438439

439-
def _print(communicator, body, sender, subject, correlation_id):
440+
def _print(coordinator, body, sender, subject, correlation_id):
440441
"""Format the incoming broadcast data into a message and echo it to stdout."""
441442
if body is None:
442443
body = 'No message specified'
@@ -446,7 +447,7 @@ def _print(communicator, body, sender, subject, correlation_id):
446447

447448
echo.echo(f'Process<{sender}> [{subject}|{correlation_id}]: {body}')
448449

449-
communicator = broker.get_communicator()
450+
coordinator = broker.get_coordinator()
450451
echo.echo_report('watching for broadcasted messages, press CTRL+C to stop...')
451452

452453
if most_recent_node:
@@ -457,7 +458,7 @@ def _print(communicator, body, sender, subject, correlation_id):
457458
echo.echo_error(f'Process<{process.pk}> is already terminated')
458459
continue
459460

460-
communicator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk))
461+
coordinator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk))
461462

462463
try:
463464
# Block this thread indefinitely until interrupt
@@ -467,7 +468,7 @@ def _print(communicator, body, sender, subject, correlation_id):
467468
echo.echo('') # add a new line after the interrupt character
468469
echo.echo_report('received interrupt, exiting...')
469470
try:
470-
communicator.close()
471+
coordinator.close()
471472
except RuntimeError:
472473
pass
473474

src/aiida/cmdline/commands/cmd_rabbitmq.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from aiida.cmdline.commands.cmd_devel import verdi_devel
2121
from aiida.cmdline.params import arguments, options
2222
from aiida.cmdline.utils import decorators, echo, echo_tabulate
23+
from aiida.manage.manager import Manager
2324

2425
if t.TYPE_CHECKING:
2526
import requests
@@ -131,12 +132,13 @@ def with_client(ctx, wrapped, _, args, kwargs):
131132

132133
@cmd_rabbitmq.command('server-properties')
133134
@decorators.with_manager
134-
def cmd_server_properties(manager):
135+
def cmd_server_properties(manager: Manager):
135136
"""List the server properties."""
136137
import yaml
137138

138139
data = {}
139-
for key, value in manager.get_communicator().server_properties.items():
140+
# FIXME: server_properties as an common API for coordinator?
141+
for key, value in manager.get_coordinator().communicator.server_properties.items():
140142
data[key] = value.decode('utf-8') if isinstance(value, bytes) else value
141143
click.echo(yaml.dump(data, indent=4))
142144

src/aiida/cmdline/commands/cmd_status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def verdi_status(print_traceback, no_rmq):
132132

133133
if broker:
134134
try:
135-
broker.get_communicator()
135+
broker.get_coordinator()
136136
except Exception as exc:
137137
message = f'Unable to connect to broker: {broker}'
138138
print_status(ServiceStatus.ERROR, 'broker', message, exception=exc, print_traceback=print_traceback)

src/aiida/engine/processes/futures.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import Optional, Union
1313

1414
import kiwipy
15+
from plumpy.coordinator import Coordinator
1516

1617
from aiida.orm import Node, load_node
1718

@@ -28,36 +29,36 @@ def __init__(
2829
pk: int,
2930
loop: Optional[asyncio.AbstractEventLoop] = None,
3031
poll_interval: Union[None, int, float] = None,
31-
communicator: Optional[kiwipy.Communicator] = None,
32+
coordinator: Optional[Coordinator] = None,
3233
):
3334
"""Construct a future for a process node being finished.
3435
3536
If a None poll_interval is supplied polling will not be used.
36-
If a communicator is supplied it will be used to listen for broadcast messages.
37+
If a coordinator is supplied it will be used to listen for broadcast messages.
3738
3839
:param pk: process pk
3940
:param loop: An event loop
4041
:param poll_interval: optional polling interval, if None, polling is not activated.
41-
:param communicator: optional communicator, if None, will not subscribe to broadcasts.
42+
:param coordinator: optional coordinator, if None, will not subscribe to broadcasts.
4243
"""
4344
from .process import ProcessState
4445

4546
# create future in specified event loop
4647
loop = loop if loop is not None else asyncio.get_event_loop()
4748
super().__init__(loop=loop)
4849

49-
assert not (poll_interval is None and communicator is None), 'Must poll or have a communicator to use'
50+
assert not (poll_interval is None and coordinator is None), 'Must poll or have a coordinator to use'
5051

5152
node = load_node(pk=pk)
5253

5354
if node.is_terminated:
5455
self.set_result(node)
5556
else:
56-
self._communicator = communicator
57+
self._coordinator = coordinator
5758
self.add_done_callback(lambda _: self.cleanup())
5859

5960
# Try setting up a filtered broadcast subscriber
60-
if self._communicator is not None:
61+
if self._coordinator is not None:
6162

6263
def _subscriber(*args, **kwargs):
6364
if not self.done():
@@ -66,17 +67,17 @@ def _subscriber(*args, **kwargs):
6667
broadcast_filter = kiwipy.BroadcastFilter(_subscriber, sender=pk)
6768
for state in [ProcessState.FINISHED, ProcessState.KILLED, ProcessState.EXCEPTED]:
6869
broadcast_filter.add_subject_filter(f'state_changed.*.{state.value}')
69-
self._broadcast_identifier = self._communicator.add_broadcast_subscriber(broadcast_filter)
70+
self._broadcast_identifier = self._coordinator.add_broadcast_subscriber(broadcast_filter)
7071

7172
# Start polling
7273
if poll_interval is not None:
7374
loop.create_task(self._poll_process(node, poll_interval))
7475

7576
def cleanup(self) -> None:
76-
"""Clean up the future by removing broadcast subscribers from the communicator if it still exists."""
77-
if self._communicator is not None:
78-
self._communicator.remove_broadcast_subscriber(self._broadcast_identifier)
79-
self._communicator = None
77+
"""Clean up the future by removing broadcast subscribers from the coordinator if it still exists."""
78+
if self._coordinator is not None:
79+
self._coordinator.remove_broadcast_subscriber(self._broadcast_identifier)
80+
self._coordinator = None
8081
self._broadcast_identifier = None
8182

8283
async def _poll_process(self, node: Node, poll_interval: Union[int, float]) -> None:

src/aiida/manage/manager.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -326,24 +326,6 @@ def get_persister(self) -> 'AiiDAPersister':
326326

327327
return self._persister
328328

329-
def get_communicator(self) -> 'RmqThreadCommunicator':
330-
"""Return the communicator
331-
332-
:return: a global communicator instance
333-
334-
"""
335-
from aiida.common import ConfigurationError
336-
337-
broker = self.get_broker()
338-
339-
if broker is None:
340-
assert self._profile is not None
341-
raise ConfigurationError(
342-
f'profile `{self._profile.name}` does not provide a communicator because it does not define a broker'
343-
)
344-
345-
return broker.get_communicator()
346-
347329
def get_coordinator(self) -> 'Coordinator':
348330
"""Return the coordinator
349331

tests/brokers/test_rabbitmq.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def raise_connection_error():
3232
broker = manager.get_broker()
3333
assert 'RabbitMQ v' in str(broker)
3434

35-
monkeypatch.setattr(broker, 'get_communicator', raise_connection_error)
35+
monkeypatch.setattr(broker, 'get_coordinator', raise_connection_error)
3636
assert 'RabbitMQ @' in str(broker)
3737

3838

@@ -92,14 +92,14 @@ def test_communicator(url):
9292
RmqThreadCommunicator.connect(connection_params={'url': url})
9393

9494

95-
def test_add_rpc_subscriber(communicator):
95+
def test_add_rpc_subscriber(coordinator):
9696
"""Test ``add_rpc_subscriber``."""
97-
communicator.add_rpc_subscriber(None)
97+
coordinator.add_rpc_subscriber(None)
9898

9999

100-
def test_add_broadcast_subscriber(communicator):
100+
def test_add_broadcast_subscriber(coordinator):
101101
"""Test ``add_broadcast_subscriber``."""
102-
communicator.add_broadcast_subscriber(None)
102+
coordinator.add_broadcast_subscriber(None)
103103

104104

105105
@pytest.mark.usefixtures('aiida_profile_clean')

tests/conftest.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from aiida.common.folders import Folder
3333
from aiida.common.links import LinkType
3434
from aiida.manage.configuration import Profile, get_config, load_profile
35+
from aiida.manage.manager import Manager
3536

3637
if t.TYPE_CHECKING:
3738
from aiida.manage.configuration.config import Config
@@ -540,9 +541,9 @@ def backend(manager):
540541

541542

542543
@pytest.fixture
543-
def communicator(manager):
544-
"""Get the ``Communicator`` instance of the currently loaded profile to communicate with RabbitMQ."""
545-
return manager.get_communicator()
544+
def coordinator(manager: Manager):
545+
"""Get the ``Coordinator`` instance of the currently loaded profile to communicate with RabbitMQ."""
546+
return manager.get_coordinator()
546547

547548

548549
@pytest.fixture

tests/engine/test_futures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def test_calculation_future_broadcasts(self):
3131

3232
# No polling
3333
future = processes.futures.ProcessFuture(
34-
pk=process.pid, loop=runner.loop, communicator=manager.get_communicator()
34+
pk=process.pid, loop=runner.loop, coordinator=manager.get_coordinator()
3535
)
3636

3737
run(process)

0 commit comments

Comments
 (0)