Skip to content

Commit 608462b

Browse files
committed
Use PID file sentinel instead of cmdline matching for broker validation
Write 'aiida-zmq-broker <pid>' to the PID file so is_running() can validate ownership without fragile command-line string matching. Also: fix sender type in make_broadcast_message (str, not uuid.UUID), drop underscore prefix from start/stop_zmq_broker test helpers.
1 parent 0444960 commit 608462b

5 files changed

Lines changed: 42 additions & 43 deletions

File tree

src/aiida/brokers/zmq/broker.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -87,36 +87,35 @@ def router_endpoint(self) -> str | None:
8787
return None
8888
return f'ipc://{sockets_path}/router.sock'
8989

90+
_PID_SENTINEL = 'aiida-zmq-broker'
91+
9092
def get_service_pid(self) -> int | None:
91-
"""Read the ZmqBrokerService PID from its PID file."""
93+
"""Read the ZmqBrokerService PID from its PID file.
94+
95+
The PID file contains ``aiida-zmq-broker <pid>`` as a sentinel so we
96+
can validate ownership without fragile command-line string matching.
97+
"""
9298
if not self._service_pid_file.exists():
9399
return None
94100
try:
95-
return int(self._service_pid_file.read_text().strip())
101+
content = self._service_pid_file.read_text().strip()
102+
if content.startswith(self._PID_SENTINEL):
103+
return int(content.split()[-1])
104+
# Fallback: bare PID (old format)
105+
return int(content)
96106
except (ValueError, OSError):
97107
return None
98108

99-
def _validate_service_pid(self, pid: int) -> bool:
100-
"""Check that a PID belongs to a running ZmqBrokerService process.
101-
102-
Matches both direct invocation (``python -m aiida.brokers.zmq.service``)
103-
and the circus-started path (``verdi … daemon broker``).
104-
"""
105-
try:
106-
proc = psutil.Process(pid)
107-
if proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE:
108-
cmdline_str = ' '.join(proc.cmdline())
109-
return 'aiida.brokers.zmq' in cmdline_str or 'daemon broker' in cmdline_str
110-
except (psutil.NoSuchProcess, psutil.AccessDenied):
111-
return False
112-
return False
113-
114109
def is_running(self) -> bool:
115110
"""Check if the ZmqBrokerService process is running."""
116111
pid = self.get_service_pid()
117112
if pid is None:
118113
return False
119-
return self._validate_service_pid(pid)
114+
try:
115+
proc = psutil.Process(pid)
116+
return proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE
117+
except (psutil.NoSuchProcess, psutil.AccessDenied):
118+
return False
120119

121120
def get_service_status(self) -> dict[str, t.Any] | None:
122121
"""Read the ZmqBrokerService status from its status file."""

src/aiida/brokers/zmq/protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def make_rpc_response(rpc_id: str, sender: str, result: Any = None, error: str |
162162

163163

164164
def make_broadcast_message(
165-
body: Any, sender: str | uuid.UUID, subject: str | None = None, correlation_id: str | None = None
165+
body: Any, sender: str, subject: str | None = None, correlation_id: str | None = None
166166
) -> dict:
167167
"""Create a broadcast message dictionary."""
168168
return {

src/aiida/brokers/zmq/service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def start(self) -> None:
7575
decoder=YAML_DECODER,
7676
)
7777

78-
self._pid_file.write_text(str(os.getpid()))
78+
self._pid_file.write_text(f'aiida-zmq-broker {os.getpid()}')
7979

8080
# SIGINT (ctrl-c / circus graceful) + SIGTERM (circus stop)
8181
signal.signal(signal.SIGINT, self._handle_shutdown)

tests/brokers/test_zmq.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from aiida.brokers.zmq.queue import PersistentQueue
1717
from aiida.brokers.zmq.server import ZmqBrokerServer
1818
from aiida.brokers.zmq.service import ZmqBrokerService
19-
from tests.conftest import _start_zmq_broker, _stop_zmq_broker
19+
from tests.conftest import start_zmq_broker, stop_zmq_broker
2020

2121

2222
class TestZmqDefaults:
@@ -54,48 +54,48 @@ def test_endpoints_no_sockets_file(self, tmp_path):
5454
def test_start_stop_cycle(self, tmp_path):
5555
"""Test querying a running broker service."""
5656
broker = ZmqBroker.from_base_path(tmp_path)
57-
_start_zmq_broker(broker)
57+
start_zmq_broker(broker)
5858

5959
try:
6060
assert broker.is_running()
6161
assert broker.get_service_pid() is not None
6262
assert broker.router_endpoint is not None
6363
finally:
64-
_stop_zmq_broker(broker)
64+
stop_zmq_broker(broker)
6565

6666
assert not broker.is_running()
6767

6868
def test_start_already_running(self, tmp_path):
6969
"""Test starting when already running is idempotent."""
7070
broker = ZmqBroker.from_base_path(tmp_path)
71-
_start_zmq_broker(broker)
71+
start_zmq_broker(broker)
7272

7373
try:
74-
_start_zmq_broker(broker) # idempotent
74+
start_zmq_broker(broker) # idempotent
7575
assert broker.is_running()
7676
finally:
77-
_stop_zmq_broker(broker)
77+
stop_zmq_broker(broker)
7878

7979
def test_stop_not_running(self, tmp_path):
8080
"""Test stopping when not running is a no-op."""
8181
broker = ZmqBroker.from_base_path(tmp_path)
82-
_stop_zmq_broker(broker) # Should not raise
82+
stop_zmq_broker(broker) # Should not raise
8383

8484
def test_restart(self, tmp_path):
8585
"""Test restarting the broker service."""
8686
broker = ZmqBroker.from_base_path(tmp_path)
87-
_start_zmq_broker(broker)
87+
start_zmq_broker(broker)
8888

8989
try:
9090
old_pid = broker.get_service_pid()
91-
_stop_zmq_broker(broker)
92-
_start_zmq_broker(broker)
91+
stop_zmq_broker(broker)
92+
start_zmq_broker(broker)
9393
assert broker.is_running()
9494

9595
new_pid = broker.get_service_pid()
9696
assert new_pid != old_pid
9797
finally:
98-
_stop_zmq_broker(broker)
98+
stop_zmq_broker(broker)
9999

100100

101101
class TestZmqBrokerServer:
@@ -254,7 +254,7 @@ class TestZmqCommunicator:
254254
def test_init(self, tmp_path):
255255
"""Test communicator initialization."""
256256
broker = ZmqBroker.from_base_path(tmp_path)
257-
_start_zmq_broker(broker)
257+
start_zmq_broker(broker)
258258

259259
try:
260260
communicator = ZmqCommunicator(
@@ -269,12 +269,12 @@ def test_init(self, tmp_path):
269269

270270
assert communicator.is_closed() is True
271271
finally:
272-
_stop_zmq_broker(broker)
272+
stop_zmq_broker(broker)
273273

274274
def test_context_manager(self, tmp_path):
275275
"""Test communicator as context manager."""
276276
broker = ZmqBroker.from_base_path(tmp_path)
277-
_start_zmq_broker(broker)
277+
start_zmq_broker(broker)
278278

279279
try:
280280
with ZmqCommunicator(
@@ -284,7 +284,7 @@ def test_context_manager(self, tmp_path):
284284

285285
assert communicator.is_closed() is True
286286
finally:
287-
_stop_zmq_broker(broker)
287+
stop_zmq_broker(broker)
288288

289289

290290
class TestZmqBrokerIntegration:
@@ -295,11 +295,11 @@ def zmq_broker(self, tmp_path):
295295
"""Create a ZMQ broker for testing."""
296296
broker_dir = tmp_path / 'broker' / 'test-uuid'
297297
broker = ZmqBroker.from_base_path(broker_dir)
298-
_start_zmq_broker(broker)
298+
start_zmq_broker(broker)
299299

300300
yield broker
301301

302-
_stop_zmq_broker(broker)
302+
stop_zmq_broker(broker)
303303

304304
def test_broker_lifecycle(self, zmq_broker):
305305
"""Test the broker lifecycle."""

tests/conftest.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class TestBrokerBackend(Enum):
6666
NONE = 'none'
6767

6868

69-
def _start_zmq_broker(broker, timeout: float = 10.0):
69+
def start_zmq_broker(broker, timeout: float = 10.0):
7070
"""Start a ZMQ broker service subprocess for testing.
7171
7272
:param broker: A ``ZmqBroker`` instance (has ``base_path``, ``is_running()``, etc.)
@@ -93,13 +93,13 @@ def _start_zmq_broker(broker, timeout: float = 10.0):
9393
raise TimeoutError(f'ZMQ broker did not start within {timeout}s')
9494

9595

96-
def _stop_zmq_broker(broker, timeout: float = 5.0):
96+
def stop_zmq_broker(broker, timeout: float = 5.0):
9797
"""Stop a ZMQ broker service subprocess for testing.
9898
9999
:param broker: A ``ZmqBroker`` instance
100100
"""
101101
pid = broker.get_service_pid()
102-
if pid is None or not broker._validate_service_pid(pid):
102+
if pid is None or not broker.is_running():
103103
broker._cleanup_stale_service_files()
104104
return
105105

@@ -111,7 +111,7 @@ def _stop_zmq_broker(broker, timeout: float = 5.0):
111111

112112
start_time = time.time()
113113
while time.time() - start_time < timeout:
114-
if not broker._validate_service_pid(pid):
114+
if not broker.is_running():
115115
broker._cleanup_stale_service_files()
116116
return
117117
time.sleep(0.1)
@@ -283,11 +283,11 @@ def aiida_profile(pytestconfig, aiida_config, aiida_profile_factory, config_psql
283283
# Start ZMQ broker service if needed (tests don't use circus)
284284
broker_instance = get_manager().get_broker()
285285
if broker_instance is not None and hasattr(broker_instance, 'is_running'):
286-
_start_zmq_broker(broker_instance)
286+
start_zmq_broker(broker_instance)
287287
try:
288288
yield profile
289289
finally:
290-
_stop_zmq_broker(broker_instance)
290+
stop_zmq_broker(broker_instance)
291291
else:
292292
yield profile
293293

0 commit comments

Comments
 (0)