Skip to content

Commit d2201e7

Browse files
committed
Integrate ZMQ broker into daemon, status, and CI (aiidateam#7284)
Wire the ZMQ broker into the daemon lifecycle: - Add `verdi daemon broker` hidden command for circus to manage - Add ZMQ broker as a circus watcher started before workers - Show ZMQ broker status in `verdi daemon status` and `verdi status` Fix SQLite stale-read issue by committing the session after RPC calls in `control.py`. Update CI to run test matrix with both `rmq` and `zmq` broker backends. Remove RabbitMQ service dependency from minimum-requirements and presto test jobs.
1 parent 9e04998 commit d2201e7

9 files changed

Lines changed: 336 additions & 122 deletions

File tree

.github/workflows/ci-code.yml

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
matrix:
2929
python-version: ['3.10', '3.14']
3030
database-backend: [psql]
31+
broker-backend: [rmq, zmq]
3132

3233
services:
3334
postgres:
@@ -72,7 +73,7 @@ jobs:
7273
AIIDA_TEST_PROFILE: test_aiida
7374
AIIDA_WARN_v3: 1
7475
run: |
75-
pytest -n auto --db-backend ${{ matrix.database-backend }} -m 'not nightly' tests/ ${{ matrix.python-version == '3.14' && '--cov aiida' || '' }}
76+
pytest -n auto --db-backend ${{ matrix.database-backend }} --broker-backend ${{ matrix.broker-backend }} -m 'not nightly' tests/ ${{ matrix.python-version == '3.14' && '--cov aiida' || '' }}
7677
7778
- name: Upload coverage report
7879
if: matrix.python-version == 3.14 && github.repository == 'aiidateam/aiida-core'
@@ -104,13 +105,6 @@ jobs:
104105
python-version: ['3.10']
105106
database-backend: [sqlite]
106107

107-
services:
108-
rabbitmq:
109-
image: rabbitmq:3.8.14-management
110-
ports:
111-
- 5672:5672
112-
- 15672:15672
113-
114108
steps:
115109
- uses: actions/checkout@v6
116110

@@ -131,7 +125,7 @@ jobs:
131125
- name: Run test suite
132126
env:
133127
AIIDA_WARN_v3: 0
134-
run: pytest --disable-warnings -n auto --db-backend ${{ matrix.database-backend }} -m 'not nightly' tests/
128+
run: pytest --disable-warnings -n auto --db-backend ${{ matrix.database-backend }} --broker-backend zmq -m 'not nightly' tests/
135129

136130

137131
tests-presto:
@@ -156,7 +150,7 @@ jobs:
156150
- name: Run test suite
157151
env:
158152
AIIDA_WARN_v3: 0
159-
run: pytest -n auto -m 'presto' tests/
153+
run: pytest -n auto --broker-backend zmq -m 'presto' tests/
160154

161155

162156
verdi:

.github/workflows/release.yml

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,14 @@ jobs:
6363
runs-on: ubuntu-24.04
6464
timeout-minutes: 30
6565

66-
services:
67-
rabbitmq:
68-
image: rabbitmq:3.8.14-management
69-
ports:
70-
- 5672:5672
71-
- 15672:15672
72-
7366
steps:
7467
- uses: actions/checkout@v6
7568

7669
- name: Install aiida-core
7770
uses: ./.github/actions/install-aiida-core
7871

7972
- name: Run sub-set of test suite
80-
run: pytest -s -m requires_rmq --db-backend=sqlite tests/
73+
run: pytest -s -m requires_broker --broker-backend zmq --db-backend=sqlite tests/
8174

8275
publish-pypi:
8376
if: startsWith(github.ref, 'refs/tags/')

src/aiida/cmdline/commands/cmd_daemon.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def status(ctx, all_profiles, timeout):
109109

110110
from aiida.cmdline.utils.common import format_local_time
111111
from aiida.engine.daemon.client import DaemonException, get_daemon_client
112+
from aiida.manage.manager import get_manager
112113

113114
if all_profiles is True:
114115
profiles = [profile for profile in ctx.obj.config.profiles if not profile.is_test_profile]
@@ -146,8 +147,29 @@ def status(ctx, all_profiles, timeout):
146147
workers_info = '--> No workers are running. Use `verdi daemon incr` to start some!\n'
147148

148149
start_time = format_local_time(daemon_response['info']['create_time'])
150+
151+
# Build broker status line for managed brokers (e.g., ZMQ)
152+
broker_line = ''
153+
broker = get_manager().get_broker()
154+
from aiida.brokers.zmq.broker import ZmqBroker
155+
156+
if isinstance(broker, ZmqBroker):
157+
if broker.is_running():
158+
status_info = broker.get_service_status()
159+
if status_info:
160+
broker_pid = status_info.get('pid', '?')
161+
pending = status_info.get('pending_tasks', 0)
162+
processing = status_info.get('processing_tasks', 0)
163+
broker_line = (
164+
f'Broker is running as PID {broker_pid} [{pending} pending, {processing} processing]\n'
165+
f'Broker directory: {broker.base_path}\n'
166+
)
167+
else:
168+
broker_line = 'Broker is NOT running\n'
169+
149170
echo.echo(
150171
f'Daemon is running as PID {daemon_response["info"]["pid"]} since {start_time}\n'
172+
f'{broker_line}'
151173
f'Active workers [{len(workers)}]:\n{workers_info}\n'
152174
f'Log file: {client.daemon_log_file}\n'
153175
'Use `verdi daemon [incr | decr] [num]` to increase / decrease the number of workers'
@@ -279,3 +301,20 @@ def worker():
279301
from aiida.engine.daemon.worker import start_daemon_worker
280302

281303
start_daemon_worker(foreground=True)
304+
305+
306+
@verdi_daemon.command('broker', hidden=True)
307+
@decorators.with_dbenv()
308+
@decorators.requires_broker
309+
def broker():
310+
"""Run the ZMQ broker server in the current process.
311+
312+
.. note:: this should not be called directly from the commandline!
313+
"""
314+
from aiida.brokers.zmq.broker import get_broker_base_path
315+
from aiida.brokers.zmq.service import run_broker_service
316+
from aiida.manage.manager import get_manager
317+
318+
profile = get_manager().get_profile()
319+
assert profile is not None
320+
run_broker_service(base_path=get_broker_base_path(profile))

src/aiida/cmdline/commands/cmd_status.py

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -130,46 +130,69 @@ def verdi_status(print_traceback: bool, no_rmq: bool) -> None:
130130
version=3,
131131
)
132132

133-
# Getting the broker status
133+
# Getting the daemon and broker status
134134
broker = manager.get_broker()
135135

136136
if broker:
137-
try:
138-
broker.get_communicator()
139-
except Exception as exc:
140-
message = f'Unable to connect to broker: {broker}'
141-
print_status(ServiceStatus.ERROR, 'broker', message, exception=exc, print_traceback=print_traceback)
142-
exit_code = ExitCode.CRITICAL
143-
else:
144-
print_status(ServiceStatus.UP, 'broker', str(broker))
145-
finally:
146-
broker.close()
147-
else:
137+
from aiida.brokers.zmq.broker import ZmqBroker
138+
139+
# For RabbitMQ: verify broker connectivity as a separate status line
140+
# For ZMQ: broker info is shown alongside the daemon status below
141+
if not isinstance(broker, ZmqBroker):
142+
try:
143+
broker.get_communicator()
144+
except Exception as exc:
145+
message = f'Unable to connect to broker: {broker}'
146+
print_status(ServiceStatus.ERROR, 'broker', message, exception=exc, print_traceback=print_traceback)
147+
exit_code = ExitCode.CRITICAL
148+
else:
149+
print_status(ServiceStatus.UP, 'broker', str(broker))
150+
finally:
151+
broker.close()
152+
153+
if not broker:
148154
print_status(
149155
ServiceStatus.WARNING,
150156
'broker',
151157
f'No broker defined for this profile: certain functionality not available.\nSee {URL_NO_BROKER}',
152158
)
153-
154-
# Getting the daemon status
155-
try:
156-
status = manager.get_daemon_client().get_status()
157-
except ConfigurationError:
158159
print_status(
159160
ServiceStatus.WARNING,
160161
'daemon',
161-
'No broker defined for this profile: daemon is not available.',
162+
f'No broker defined for this profile: daemon is not available. See {URL_NO_BROKER}',
162163
)
163-
except DaemonNotRunningException as exception:
164-
print_status(ServiceStatus.WARNING, 'daemon', str(exception))
165-
except DaemonException as exception:
166-
print_status(ServiceStatus.ERROR, 'daemon', str(exception))
167-
except Exception as exception:
168-
message = 'Error getting daemon status'
169-
print_status(ServiceStatus.ERROR, 'daemon', message, exception=exception, print_traceback=print_traceback)
170-
exit_code = ExitCode.CRITICAL
171164
else:
172-
print_status(ServiceStatus.UP, 'daemon', f'Daemon is running with PID {status["pid"]}')
165+
try:
166+
daemon_status = manager.get_daemon_client().get_status()
167+
except ConfigurationError:
168+
print_status(
169+
ServiceStatus.WARNING,
170+
'daemon',
171+
f'No broker defined for this profile: daemon is not available. See {URL_NO_BROKER}',
172+
)
173+
except DaemonNotRunningException as exception:
174+
print_status(ServiceStatus.WARNING, 'daemon', str(exception))
175+
except DaemonException as exception:
176+
print_status(ServiceStatus.ERROR, 'daemon', str(exception))
177+
except Exception as exception:
178+
message = 'Error getting daemon status'
179+
print_status(ServiceStatus.ERROR, 'daemon', message, exception=exception, print_traceback=print_traceback)
180+
exit_code = ExitCode.CRITICAL
181+
else:
182+
daemon_msg = f'Daemon is running with PID {daemon_status["pid"]}'
183+
# Append broker info for managed brokers (e.g., ZMQ)
184+
185+
if isinstance(broker, ZmqBroker):
186+
if broker.is_running():
187+
status_info = broker.get_service_status()
188+
if status_info:
189+
broker_pid = status_info.get('pid', '?')
190+
pending = status_info.get('pending_tasks', 0)
191+
processing = status_info.get('processing_tasks', 0)
192+
daemon_msg += f', Broker PID {broker_pid} [{pending} pending, {processing} processing]'
193+
else:
194+
daemon_msg += ', Broker is NOT running'
195+
print_status(ServiceStatus.UP, 'daemon', daemon_msg)
173196

174197
# Note: click does not forward return values to the exit code, see https://github.com/pallets/click/issues/747
175198
if exit_code != ExitCode.SUCCESS:

src/aiida/engine/daemon/client.py

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ def cmd_start_daemon_worker(self) -> list[str]:
149149
"""Return the command to start a daemon worker process."""
150150
return [self._verdi_bin, '-p', self.profile.name, 'daemon', 'worker']
151151

152+
@property
153+
def cmd_start_broker(self) -> list[str]:
154+
"""Return the command to start the ZMQ broker server process."""
155+
return [self._verdi_bin, '-p', self.profile.name, 'daemon', 'broker']
156+
152157
@property
153158
def loglevel(self) -> str:
154159
return get_config_option('logging.circus_loglevel')
@@ -692,6 +697,61 @@ def _start_daemon(self, number_workers: int = 1, foreground: bool = False) -> No
692697
if not foreground:
693698
logoutput = self.circus_log_file
694699

700+
watchers = []
701+
702+
# Start ZMQ broker before workers so its sockets are ready when workers connect.
703+
# Skip if a broker is already running (e.g. started by the test fixture).
704+
if self.profile.process_control_backend == 'core.zmq':
705+
from aiida.manage.manager import get_manager
706+
707+
broker_instance = get_manager().get_broker()
708+
broker_already_running = (
709+
broker_instance is not None and hasattr(broker_instance, 'is_running') and broker_instance.is_running()
710+
)
711+
712+
if not broker_already_running:
713+
watchers.append(
714+
{
715+
'cmd': ' '.join(self.cmd_start_broker),
716+
'name': f'{self.daemon_name}-broker',
717+
'numprocesses': 1,
718+
'virtualenv': self.virtualenv,
719+
'copy_env': True,
720+
'stdout_stream': {
721+
'class': 'FileStream',
722+
'filename': self.daemon_log_file,
723+
'time_format': '%Y-%m-%d %H:%M:%S',
724+
},
725+
'stderr_stream': {
726+
'class': 'FileStream',
727+
'filename': self.daemon_log_file,
728+
'time_format': '%Y-%m-%d %H:%M:%S',
729+
},
730+
'env': self.get_env(),
731+
}
732+
)
733+
734+
watchers.append(
735+
{
736+
'cmd': ' '.join(self.cmd_start_daemon_worker),
737+
'name': self.daemon_name,
738+
'numprocesses': number_workers,
739+
'virtualenv': self.virtualenv,
740+
'copy_env': True,
741+
'stdout_stream': {
742+
'class': 'FileStream',
743+
'filename': self.daemon_log_file,
744+
'time_format': '%Y-%m-%d %H:%M:%S',
745+
},
746+
'stderr_stream': {
747+
'class': 'FileStream',
748+
'filename': self.daemon_log_file,
749+
'time_format': '%Y-%m-%d %H:%M:%S',
750+
},
751+
'env': self.get_env(),
752+
}
753+
)
754+
695755
arbiter_config = {
696756
'controller': self.get_controller_endpoint(),
697757
'pubsub_endpoint': self.get_pubsub_endpoint(),
@@ -701,26 +761,7 @@ def _start_daemon(self, number_workers: int = 1, foreground: bool = False) -> No
701761
'debug': False,
702762
'statsd': True,
703763
'pidfile': self.circus_pid_file,
704-
'watchers': [
705-
{
706-
'cmd': ' '.join(self.cmd_start_daemon_worker),
707-
'name': self.daemon_name,
708-
'numprocesses': number_workers,
709-
'virtualenv': self.virtualenv,
710-
'copy_env': True,
711-
'stdout_stream': {
712-
'class': 'FileStream',
713-
'filename': self.daemon_log_file,
714-
'time_format': '%Y-%m-%d %H:%M:%S',
715-
},
716-
'stderr_stream': {
717-
'class': 'FileStream',
718-
'filename': self.daemon_log_file,
719-
'time_format': '%Y-%m-%d %H:%M:%S',
720-
},
721-
'env': self.get_env(),
722-
}
723-
],
764+
'watchers': watchers,
724765
}
725766

726767
if not foreground:

src/aiida/engine/processes/control.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,13 @@ def _perform_actions(
237237

238238
_resolve_futures(futures, infinitive, present, timeout)
239239

240+
# End the current read transaction so that subsequent attribute reads see
241+
# the state committed by the daemon. Without this, SQLite's transaction
242+
# snapshot isolation can cause stale reads after the RPC round-trip.
243+
storage = get_manager().get_profile_storage()
244+
if hasattr(storage, 'get_session'):
245+
storage.get_session().commit()
246+
240247

241248
def _resolve_futures(
242249
futures: dict[concurrent.futures.Future, ProcessNode],

0 commit comments

Comments
 (0)