Add ZeroMQ-based message broker plugin#7284
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #7284 +/- ##
==========================================
+ Coverage 79.92% 80.08% +0.17%
==========================================
Files 568 576 +8
Lines 44016 45280 +1264
==========================================
+ Hits 35175 36260 +1085
- Misses 8841 9020 +179 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Notes for me: |
I merged client.py into broker.py for zmq so ZmqBroker now contains the management functionalities. I think doing this for rmq might be a bit more messy because we create multiple connections but maybe also possible. Maybe a distinction between |
| # Timeout (in seconds) for waiting on RPC Future results in the poll thread. | ||
| # None means no timeout, matching kiwipy RMQ behavior where _on_rpc awaits | ||
| # without a deadline. The runner event loop will eventually produce a result. | ||
| RPC_TIMEOUT: float | None = None |
There was a problem hiding this comment.
double check that the rmq.task_timeout will be used here when starting the server
There was a problem hiding this comment.
there is now zmq.task_timeout
| :return: The reconstructed UUID object | ||
| """ | ||
| mapping = loader.construct_mapping(node) | ||
| return uuid.UUID(int=mapping['int']) |
There was a problem hiding this comment.
So in RMQ we just secretly convert this to int somewhere in the code. I wanted to make serialization more explicit. Its a bit overkill for one conversion. Maybe there is a better solution.
ec6a524 to
3f98e3a
Compare
|
@agoscinski I'll report feedback here, let me know if you would prefer a different location (e.g. HackMD doc). I wanted to results = engine.run(builder)But ran into a Traceback---------------------------------------------------------------------------
ConnectionError Traceback (most recent call last)
Cell In[8], [line 1](vscode-notebook-cell:?execution_count=8&line=1)
----> [1](vscode-notebook-cell:?execution_count=8&line=1) results = engine.run(builder)
File ~/project/qe/git/aiida-core/src/aiida/engine/launch.py:46, in run(process, inputs, **kwargs)
44 runner = process.runner
45 else:
---> [46](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/engine/launch.py:46) runner = manager.get_manager().get_runner()
48 return runner.run(process, inputs, **kwargs)
File ~/project/qe/git/aiida-core/src/aiida/manage/manager.py:437, in Manager.get_runner(self, **kwargs)
431 """Return a runner that is based on the current profile settings and can be used globally by the code.
432
433 :return: the global runner
434
435 """
436 if self._runner is None:
--> [437](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/manage/manager.py:437) self._runner = self.create_runner(**kwargs)
439 return self._runner
File ~/project/qe/git/aiida-core/src/aiida/manage/manager.py:476, in Manager.create_runner(self, with_persistence, **kwargs)
473 if 'communicator' not in settings:
474 # Only call get_communicator if we have to as it will lazily create
475 try:
--> [476](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/manage/manager.py:476) settings['communicator'] = self.get_communicator()
477 except ConfigurationError:
478 # The currently loaded profile does not define a broker and so there is no communicator
479 pass
File ~/project/qe/git/aiida-core/src/aiida/manage/manager.py:394, in Manager.get_communicator(self)
389 assert self._profile is not None
390 raise ConfigurationError(
391 f'profile `{self._profile.name}` does not provide a communicator because it does not define a broker'
392 )
--> [394](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/manage/manager.py:394) return broker.get_communicator()
File ~/project/qe/git/aiida-core/src/aiida/brokers/zmq/broker.py:159, in ZmqBroker.get_communicator(self, wait_for_broker)
157 break
158 else:
--> [159](https://file+.vscode-resource.vscode-cdn.net/Users/mbercx/project/qe/jupyter/dev/~/project/qe/git/aiida-core/src/aiida/brokers/zmq/broker.py:159) raise ConnectionError(f'Broker did not become ready within {wait_for_broker}s: {self}')
161 self._communicator = ZmqCommunicator(
162 router_endpoint=router_endpoint,
163 )
164 self._communicator.start()
ConnectionError: Broker did not become ready within 30.0s: ZMQ Broker @ /Users/mbercx/project/qe/.aiida/broker/8c6a9c1f5a414410b8961f8f315ff1ba <not running>I checked And saw that the A few notes here:
|
|
Thanks for the feedback!
For new profiles created from this branch the broker should start with the daemon. If you don't restart the daemon after changing your aiida-core version, the broker is never started. I think we can assume that people restart the daemon when installing a new aiida-core version. There are a lot of things that break if you change aiida-core and don't restart the daemon since the workers still have the old code. We never mentioned that we support hot reloading for workers so I think that problem you describe is acceptable. Maybe we can improve the communication of this problem in a different PR: We store the aiida-core version in some worker PID or config file. Then when people to
Yes, so we could choose a different design where a broker is not needed. The client directly sends messages to the workers. Since the number of workers and clients are highly limited and do not need to scale well for large numbers, it might be the best design for people using AiiDA on their local machine and connecting to different HPCs. There is only one client and the number of workers is typically the number of processes, which even on a big workstation is limited to 256 workers. However, we already have this broker architecture due to RMQ in place, and changing that is quite a dramatic change that affects more places in the code base. Because I mimicked the RMQ broker pattern, I could implement it by just mimicking the broker communication pattern in the new broker, so it was easy to implement. If you remove the broker completely, then you need to consider things like that messages are not persisted anymore by the broker, so where do you move this responsibility? There are multiple options for this and there is definitely a solution that works for our use case, but they all require changes on the clients, which means we probably need to touch plumpy and kiwipy and discuss this with the team. So let's say we have done this, then we still need to keep the old logic since we need to support the RMQ broker for the use case of many-clients-to-many-workers (only used by a fraction of AiiDA users but still they exist). Then we have two different logics for each broker. It is possible, but it increases maintenance costs definitely. Maybe this is the way we want to go in the future, but maybe this approach which is much simpler is good enough. People anyway need to start the daemon, and if we hide the broker inside the daemon setup, maybe the complexity for the user is roughly the same as not having a broker.
Right the new broker depends on the aiida profile, similar as the daemon worker (I think that could be changed but requires more massive refactoring). I think I added a daemon start into the profile create so you don't need to anything. Do you think there is a benefit in not starting the daemon after the profile is created?
This is done by circus. It should restart it in the current branch. It takes some time like 5 seconds or so. I don't think I used a different timing than we use for the workers
But isn't this shown already in and We can maybe improve the UI of
|
Implement a ZMQ broker as an alternative to RabbitMQ for AiiDA's process control. The broker uses ZeroMQ sockets with a persistent file-based queue and requires no external services. New modules in `src/aiida/brokers/zmq/`: - `broker.py`: ZmqBroker class implementing the Broker interface - `communicator.py`: ZmqCommunicator for process control RPCs - `protocol.py`: Wire protocol for ZMQ messages - `queue.py`: Persistent task queue with file-based storage - `server.py`: ZMQ broker server handling task routing - `service.py`: Service wrapper for running the broker process - `defaults.py`: Default configuration Register `core.zmq` entry point and add `zmq` alias in Manager.
Introduce `requires_broker` marker for tests that need any message broker (RabbitMQ or ZMQ), distinct from `requires_rmq` which needs RabbitMQ specifically. Add `--broker-backend` pytest CLI option to select broker backend (rmq, zmq, none) for test runs. Add ZMQ broker start/stop helpers in conftest.py and update the `aiida_profile` fixture to auto-start the ZMQ broker when selected. Rename markers from `requires_rmq` to `requires_broker` across test files where tests work with any broker backend.
Wire the ZMQ broker into the daemon lifecycle: - Add `verdi daemon broker` hidden command for circus to manage - Add ZMQ broker as a circus watcher started before workers - Show ZMQ broker status in `verdi daemon status` and `verdi status` Fix SQLite stale-read issue by committing the session after RPC calls in `control.py`. Update CI to run test matrix with both `rmq` and `zmq` broker backends. Remove RabbitMQ service dependency from minimum-requirements and presto test jobs.
Change `verdi presto` to always configure a broker: it tries RabbitMQ first and falls back to ZMQ if unavailable. Add `--use-zmq` flag to skip RabbitMQ detection and use ZMQ directly. Previously, profiles created without RabbitMQ had no broker at all, limiting functionality. Now every `verdi presto` profile gets a working broker out of the box. Regenerate the command-line reference during the rebase so the tracked autodocs stay in sync with the updated CLI.
Add `--broker` option to `verdi profile setup` accepting 'rabbitmq', 'zmq', or 'none'. Deprecate the `--use-rabbitmq/--no-use-rabbitmq` flag with a warning pointing to the new option. Add daemon restart logic to `verdi profile configure-rabbitmq` so broker reconfiguration takes effect immediately.
`get_daemon_client(profile.name)` called `load_profile()` which switched the global manager profile, corrupting the state for all subsequent operations in the same process. Use `DaemonClient(profile)` directly and skip the daemon check entirely when no broker is configured yet.
Allow creating profiles without any message broker configured. This is useful for profiles used only for data exploration and querying, where the daemon and process submission are not needed.
…7284) Lower default timeout from 30s to 10s and log a warning after 5s so users get feedback when the broker is taking a long time to start.
…am#7284) Change `is_running()` method to a `@property` to match the API of `ZmqBrokerServer.is_running`, making state queries more Pythonic. Update all call sites to use the property syntax.
Add `--broker` option to `verdi profile setup` accepting 'rabbitmq', 'zmq', or 'none'. Deprecate the `--use-rabbitmq/--no-use-rabbitmq` flag with a warning pointing to the new option. Add daemon restart logic to `verdi profile configure-rabbitmq` so broker reconfiguration takes effect immediately.
`get_daemon_client(profile.name)` called `load_profile()` which switched the global manager profile, corrupting the state for all subsequent operations in the same process. Use `DaemonClient(profile)` directly and skip the daemon check entirely when no broker is configured yet.
Allow creating profiles without any message broker configured. This is useful for profiles used only for data exploration and querying, where the daemon and process submission are not needed.
Document the process/thread model, socket architecture, message flow, dead peer detection, persistent queue, service files, message types, the AMQP-to-ZMQ mapping, write format and all timeout constants with diagrams.
…eam#7284) Replace `rmq.task_timeout` with a broker-agnostic `broker.task_timeout` option used by both ZMQ and RabbitMQ backends. The old option is kept with a `deprecated_by` marker on the Field, enabling generic deprecation handling in `Manager.get_option` and `verdi config set`.
Implement a ZMQ broker as an alternative to RabbitMQ for AiiDA's process control. The broker uses ZeroMQ sockets with a persistent file-based queue and requires no external services. New modules in `src/aiida/brokers/zmq/`: - `broker.py`: ZmqBroker class implementing the Broker interface - `communicator.py`: ZmqCommunicator for process control RPCs - `protocol.py`: Wire protocol for ZMQ messages - `queue.py`: Persistent task queue with file-based storage - `server.py`: ZMQ broker server handling task routing - `service.py`: Service wrapper for running the broker process - `defaults.py`: Default configuration Register `core.zmq` entry point and add `zmq` alias in Manager.
Introduce `requires_broker` marker for tests that need any message broker (RabbitMQ or ZMQ), distinct from `requires_rmq` which needs RabbitMQ specifically. Add `--broker-backend` pytest CLI option to select broker backend (rmq, zmq, none) for test runs. Add ZMQ broker start/stop helpers in conftest.py and update the `aiida_profile` fixture to auto-start the ZMQ broker when selected. Rename markers from `requires_rmq` to `requires_broker` across test files where tests work with any broker backend.
Wire the ZMQ broker into the daemon lifecycle: - Add `verdi daemon broker` hidden command for circus to manage - Add ZMQ broker as a circus watcher started before workers - Show ZMQ broker status in `verdi daemon status` and `verdi status` Fix SQLite stale-read issue by committing the session after RPC calls in `control.py`. Update CI to run test matrix with both `rmq` and `zmq` broker backends. Remove RabbitMQ service dependency from minimum-requirements and presto test jobs.
Change `verdi presto` to always configure a broker: it tries RabbitMQ first and falls back to ZMQ if unavailable. Add `--use-zmq` flag to skip RabbitMQ detection and use ZMQ directly. Previously, profiles created without RabbitMQ had no broker at all, limiting functionality. Now every `verdi presto` profile gets a working broker out of the box. Regenerate the command-line reference during the rebase so the tracked autodocs stay in sync with the updated CLI.
Add `--broker` option to `verdi profile setup` accepting 'rabbitmq', 'zmq', or 'none'. Deprecate the `--use-rabbitmq/--no-use-rabbitmq` flag with a warning pointing to the new option. Add daemon restart logic to `verdi profile configure-rabbitmq` so broker reconfiguration takes effect immediately.
`get_daemon_client(profile.name)` called `load_profile()` which switched the global manager profile, corrupting the state for all subsequent operations in the same process. Use `DaemonClient(profile)` directly and skip the daemon check entirely when no broker is configured yet.
Allow creating profiles without any message broker configured. This is useful for profiles used only for data exploration and querying, where the daemon and process submission are not needed.
Document the process/thread model, socket architecture, message flow, dead peer detection, persistent queue, service files, message types, the AMQP-to-ZMQ mapping, write format and all timeout constants with diagrams.
…eam#7284) Replace `rmq.task_timeout` with a broker-agnostic `broker.task_timeout` option used by both ZMQ and RabbitMQ backends. The old option is kept with a `deprecated_by` marker on the Field, enabling generic deprecation handling in `Manager.get_option` and `verdi config set`.
Implement a ZMQ broker as an alternative to RabbitMQ for AiiDA's process control. The broker uses ZeroMQ sockets with a persistent file-based queue and requires no external services. New modules in `src/aiida/brokers/zmq/`: - `broker.py`: ZmqBroker class implementing the Broker interface - `communicator.py`: ZmqCommunicator for process control RPCs - `protocol.py`: Wire protocol for ZMQ messages - `queue.py`: Persistent task queue with file-based storage - `server.py`: ZMQ broker server handling task routing - `service.py`: Service wrapper for running the broker process - `defaults.py`: Default configuration Register `core.zmq` entry point and add `zmq` alias in Manager.
Introduce `requires_broker` marker for tests that need any message broker (RabbitMQ or ZMQ), distinct from `requires_rmq` which needs RabbitMQ specifically. Add `--broker-backend` pytest CLI option to select broker backend (rmq, zmq, none) for test runs. Add ZMQ broker start/stop helpers in conftest.py and update the `aiida_profile` fixture to auto-start the ZMQ broker when selected. Rename markers from `requires_rmq` to `requires_broker` across test files where tests work with any broker backend.
Wire the ZMQ broker into the daemon lifecycle: - Add `verdi daemon broker` hidden command for circus to manage - Add ZMQ broker as a circus watcher started before workers - Show ZMQ broker status in `verdi daemon status` and `verdi status` Fix SQLite stale-read issue by committing the session after RPC calls in `control.py`. Update CI to run test matrix with both `rmq` and `zmq` broker backends. Remove RabbitMQ service dependency from minimum-requirements and presto test jobs.
Change `verdi presto` to always configure a broker: it tries RabbitMQ first and falls back to ZMQ if unavailable. Add `--use-zmq` flag to skip RabbitMQ detection and use ZMQ directly. Previously, profiles created without RabbitMQ had no broker at all, limiting functionality. Now every `verdi presto` profile gets a working broker out of the box. Regenerate the command-line reference during the rebase so the tracked autodocs stay in sync with the updated CLI.
Add `--broker` option to `verdi profile setup` accepting 'rabbitmq', 'zmq', or 'none'. Deprecate the `--use-rabbitmq/--no-use-rabbitmq` flag with a warning pointing to the new option. Add daemon restart logic to `verdi profile configure-rabbitmq` so broker reconfiguration takes effect immediately.
`get_daemon_client(profile.name)` called `load_profile()` which switched the global manager profile, corrupting the state for all subsequent operations in the same process. Use `DaemonClient(profile)` directly and skip the daemon check entirely when no broker is configured yet.
Allow creating profiles without any message broker configured. This is useful for profiles used only for data exploration and querying, where the daemon and process submission are not needed.
Document the process/thread model, socket architecture, message flow, dead peer detection, persistent queue, service files, message types, the AMQP-to-ZMQ mapping, write format and all timeout constants with diagrams.
…eam#7284) Replace `rmq.task_timeout` with a broker-agnostic `broker.task_timeout` option used by both ZMQ and RabbitMQ backends. The old option is kept with a `deprecated_by` marker on the Field, enabling generic deprecation handling in `Manager.get_option` and `verdi config set`.
|
Tests that fail are flaky and their fixes are these PRs or opened issues
It is nevertheless suspicious that all flaky test failure show in the ZMQ CI. I will merge and see if this behavior continuous to be seen in subsequent PRs. Maybe this is because ZMQ is spawned with the daemon and thus more resource hungry, reaching easier such edge cases. |
Please read
docs/source/internals/broker.rstin this PR for design doc.