Skip to content

Commit 1d2afbe

Browse files
committed
Run ZMQ broker as a circus watcher instead of standalone subprocess
- Add `verdi daemon broker` hidden command as circus watcher entry point - Add broker watcher to circus arbiter config for `core.zmq` profiles - Remove lifecycle methods (start/stop/is_running) from Broker base class and ZmqBroker — the broker plugin is now a pure client - Simplify ZmqBrokerManagementClient to primarily status queries; keep start/stop for test fixtures (production uses circus) - Test fixture uses management_client directly for broker lifecycle
1 parent 350f221 commit 1d2afbe

9 files changed

Lines changed: 189 additions & 316 deletions

File tree

src/aiida/brokers/broker.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,3 @@ def iterate_tasks(self):
3030
@abc.abstractmethod
3131
def close(self):
3232
"""Close the broker."""
33-
34-
def start(self) -> None:
35-
"""Start the broker service. No-op for externally managed brokers."""
36-
37-
def stop(self) -> None:
38-
"""Stop the broker service. No-op for externally managed brokers."""
39-
40-
def is_running(self) -> bool:
41-
"""Return whether the broker is running. True by default (external brokers assumed running)."""
42-
return True

src/aiida/brokers/zmq/broker.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,18 +100,6 @@ def management_client(self) -> ZmqBrokerManagementClient:
100100
"""Return the management client for broker service lifecycle."""
101101
return self._management_client
102102

103-
def start(self) -> None:
104-
"""Start the ZMQ broker service."""
105-
self._management_client.start()
106-
107-
def stop(self) -> None:
108-
"""Stop the ZMQ broker service."""
109-
self._management_client.stop()
110-
111-
def is_running(self) -> bool:
112-
"""Return whether the ZMQ broker service is running."""
113-
return self._management_client.is_running()
114-
115103
def get_communicator(self) -> ZmqCommunicator:
116104
"""Return a ZMQ communicator instance.
117105

src/aiida/brokers/zmq/client.py

Lines changed: 9 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
"""ZMQ Broker Management Client - lifecycle management for ZmqBrokerService.
1+
"""ZMQ Broker Management Client - read-only query interface for ZmqBrokerService.
22
3-
This module provides a management interface to start/stop the broker service
4-
and query its status via PID/status files. Analogous to RabbitmqManagementClient.
3+
This module provides a read-only interface to query broker service status
4+
via PID/status files. The broker lifecycle is managed by circus (production)
5+
or test helpers (testing).
56
"""
67

78
from __future__ import annotations
89

910
import json
1011
import os
1112
import shutil
12-
import signal
13-
import subprocess
14-
import sys
15-
import time
1613
from pathlib import Path
1714
from typing import Any
1815

@@ -25,19 +22,17 @@
2522

2623

2724
class ZmqBrokerManagementClient:
28-
"""Management client for ZmqBrokerService.
25+
"""Read-only management client for ZmqBrokerService.
2926
30-
Allows external code to:
31-
- Start/stop the broker service
32-
- Check if service is running
33-
- Get service status
27+
Provides:
28+
- Status queries (is_running, get_status, get_pid)
29+
- Endpoint discovery (router_endpoint, pub_endpoint)
3430
3531
Interacts with the service via PID/status files, not direct IPC.
36-
Analogous to RabbitmqManagementClient for the RabbitMQ broker.
3732
"""
3833

3934
def __init__(self, base_path: Path | str):
40-
"""Initialize the controller.
35+
"""Initialize the client.
4136
4237
:param base_path: Base path for broker data (same as ZmqBrokerService)
4338
"""
@@ -64,9 +59,6 @@ def status_file(self) -> Path:
6459
def _get_sockets_path(self) -> Path | None:
6560
"""Read the socket directory path from file.
6661
67-
The socket directory is created in a temp location by ZmqBrokerService
68-
to avoid Unix domain socket path length limits.
69-
7062
:return: Path to socket directory, or None if not available
7163
"""
7264
if not self._sockets_file.exists():
@@ -121,16 +113,12 @@ def _validate_pid(self, pid: int) -> bool:
121113
if HAS_PSUTIL:
122114
try:
123115
proc = psutil.Process(pid)
124-
# Check if process is running and is a Python process
125116
if proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE:
126-
# Verify it's our broker by checking command line
127117
cmdline = proc.cmdline()
128118
return any('aiida.brokers.zmq' in arg for arg in cmdline)
129119
except (psutil.NoSuchProcess, psutil.AccessDenied):
130120
return False
131121
else:
132-
# Fallback: check if process exists and verify it's our broker
133-
# via /proc on Linux, or accept the PID-reuse risk on other platforms
134122
try:
135123
os.kill(pid, 0)
136124
except OSError:
@@ -152,10 +140,6 @@ def _validate_pid(self, pid: int) -> bool:
152140
def is_running(self) -> bool:
153141
"""Check if broker service is running.
154142
155-
Validates that:
156-
1. PID file exists
157-
2. PID in file corresponds to a running broker process
158-
159143
:return: True if service is running
160144
"""
161145
pid = self.get_pid()
@@ -177,169 +161,13 @@ def get_status(self) -> dict[str, Any] | None:
177161
except (json.JSONDecodeError, OSError):
178162
return None
179163

180-
def start(
181-
self,
182-
foreground: bool = False,
183-
wait: bool = True,
184-
timeout: float = 10.0,
185-
) -> bool:
186-
"""Start the broker service.
187-
188-
:param foreground: If True, run in foreground (blocking); else daemonize
189-
:param wait: If True and not foreground, wait for service to start
190-
:param timeout: Timeout in seconds when waiting for service to start
191-
:return: True if service started successfully
192-
"""
193-
if self.is_running():
194-
return True
195-
196-
# Ensure base path exists
197-
self._base_path.mkdir(parents=True, exist_ok=True)
198-
199-
# Build command
200-
cmd = [
201-
sys.executable,
202-
'-m',
203-
'aiida.brokers.zmq.service',
204-
'--base-path',
205-
str(self._base_path),
206-
]
207-
208-
if foreground:
209-
# Run in foreground (blocking)
210-
subprocess.run(cmd, check=True)
211-
return True
212-
else:
213-
# Run as daemon (detached process)
214-
# Use subprocess with appropriate flags for daemon behavior
215-
if sys.platform == 'win32':
216-
# Windows: use CREATE_NEW_PROCESS_GROUP
217-
subprocess.Popen(
218-
cmd,
219-
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.DETACHED_PROCESS,
220-
stdout=subprocess.DEVNULL,
221-
stderr=subprocess.DEVNULL,
222-
stdin=subprocess.DEVNULL,
223-
)
224-
else:
225-
# Unix: use start_new_session
226-
subprocess.Popen(
227-
cmd,
228-
start_new_session=True,
229-
stdout=subprocess.DEVNULL,
230-
stderr=subprocess.DEVNULL,
231-
stdin=subprocess.DEVNULL,
232-
)
233-
234-
if wait:
235-
return self._wait_for_start(timeout)
236-
237-
return True
238-
239-
def _wait_for_start(self, timeout: float) -> bool:
240-
"""Wait for service to start.
241-
242-
:param timeout: Timeout in seconds
243-
:return: True if service started within timeout
244-
"""
245-
start_time = time.time()
246-
while time.time() - start_time < timeout:
247-
if self.is_running():
248-
return True
249-
time.sleep(0.1)
250-
return False
251-
252-
def stop(self, timeout: float = 5.0) -> bool:
253-
"""Stop the broker service.
254-
255-
Uses SIGINT for cross-platform graceful shutdown.
256-
Falls back to hard kill if timeout expires.
257-
258-
:param timeout: Seconds to wait for graceful shutdown
259-
:return: True if stopped successfully
260-
"""
261-
pid = self.get_pid()
262-
if pid is None:
263-
return True
264-
265-
if not self._validate_pid(pid):
266-
# PID file exists but process is not running, clean up
267-
self._cleanup_stale_files()
268-
return True
269-
270-
# Send SIGINT for graceful shutdown (works on all platforms)
271-
try:
272-
os.kill(pid, signal.SIGINT)
273-
except OSError:
274-
# Process already gone
275-
self._cleanup_stale_files()
276-
return True
277-
278-
# Wait for graceful shutdown
279-
if self._wait_for_stop(pid, timeout):
280-
return True
281-
282-
# Graceful shutdown failed, try hard kill
283-
return self._force_kill(pid)
284-
285-
def _wait_for_stop(self, pid: int, timeout: float) -> bool:
286-
"""Wait for process to stop.
287-
288-
:param pid: Process ID
289-
:param timeout: Timeout in seconds
290-
:return: True if process stopped within timeout
291-
"""
292-
start_time = time.time()
293-
while time.time() - start_time < timeout:
294-
if not self._validate_pid(pid):
295-
self._cleanup_stale_files()
296-
return True
297-
time.sleep(0.1)
298-
return False
299-
300-
def _force_kill(self, pid: int) -> bool:
301-
"""Force kill a process.
302-
303-
:param pid: Process ID
304-
:return: True if killed successfully
305-
"""
306-
if HAS_PSUTIL:
307-
try:
308-
proc = psutil.Process(pid)
309-
proc.terminate() # Sends SIGTERM on Unix, TerminateProcess on Windows
310-
proc.wait(timeout=2.0)
311-
self._cleanup_stale_files()
312-
return True
313-
except (psutil.NoSuchProcess, psutil.TimeoutExpired):
314-
try:
315-
proc.kill() # SIGKILL on Unix, TerminateProcess on Windows
316-
self._cleanup_stale_files()
317-
return True
318-
except psutil.NoSuchProcess:
319-
self._cleanup_stale_files()
320-
return True
321-
except psutil.AccessDenied:
322-
return False
323-
else:
324-
# Without psutil, try SIGKILL on Unix (not available on Windows)
325-
if sys.platform != 'win32':
326-
try:
327-
os.kill(pid, signal.SIGKILL)
328-
self._cleanup_stale_files()
329-
return True
330-
except OSError:
331-
self._cleanup_stale_files()
332-
return True
333-
return False # type: ignore[unreachable] # Windows without psutil
334-
335164
def _cleanup_stale_files(self) -> None:
336165
"""Clean up stale PID, status, and socket files."""
337166
if self._pid_file.exists():
338167
self._pid_file.unlink(missing_ok=True)
339168
if self._status_file.exists():
340169
self._status_file.unlink(missing_ok=True)
341170

342-
# Clean up orphaned socket directory
343171
sockets_path = self._get_sockets_path()
344172
if sockets_path is not None and sockets_path.exists():
345173
try:
@@ -348,12 +176,3 @@ def _cleanup_stale_files(self) -> None:
348176
pass
349177
if self._sockets_file.exists():
350178
self._sockets_file.unlink(missing_ok=True)
351-
352-
def restart(self, timeout: float = 5.0) -> bool:
353-
"""Restart the broker service.
354-
355-
:param timeout: Timeout for stop operation
356-
:return: True if restarted successfully
357-
"""
358-
self.stop(timeout=timeout)
359-
return self.start(wait=True)

src/aiida/cmdline/commands/cmd_daemon.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def status(ctx, all_profiles, timeout):
152152
broker_line = ''
153153
broker = get_manager().get_broker()
154154
if broker is not None and hasattr(broker, 'management_client'):
155-
if broker.is_running():
155+
if broker.management_client.is_running():
156156
status_info = broker.management_client.get_status()
157157
if status_info:
158158
broker_pid = status_info.get('pid', '?')
@@ -295,3 +295,19 @@ def worker():
295295
from aiida.engine.daemon.worker import start_daemon_worker
296296

297297
start_daemon_worker(foreground=True)
298+
299+
300+
@verdi_daemon.command('broker', hidden=True)
301+
@decorators.with_dbenv()
302+
@decorators.requires_broker
303+
def broker():
304+
"""Run the ZMQ broker server in the current process.
305+
306+
.. note:: this should not be called directly from the commandline!
307+
"""
308+
from aiida.brokers.zmq.broker import get_broker_base_path
309+
from aiida.brokers.zmq.service import run_broker_service
310+
from aiida.manage.manager import get_manager
311+
312+
profile = get_manager().get_profile()
313+
run_broker_service(base_path=get_broker_base_path(profile))

src/aiida/cmdline/commands/cmd_status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def verdi_status(print_traceback, no_rmq):
157157
daemon_msg = f'Daemon is running with PID {daemon_status["pid"]}'
158158
# Append broker info for managed brokers (e.g., ZMQ)
159159
if hasattr(broker, 'management_client'):
160-
if broker.is_running():
160+
if broker.management_client.is_running():
161161
status_info = broker.management_client.get_status()
162162
if status_info:
163163
broker_pid = status_info.get('pid', '?')

0 commit comments

Comments
 (0)