Skip to content

Commit 139a004

Browse files
committed
add number_workers back for scheduler
1 parent 60b9335 commit 139a004

File tree

3 files changed

+25
-11
lines changed

3 files changed

+25
-11
lines changed

aiida_workgraph/cli/cmd_scheduler.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from pathlib import Path
44
from aiida.cmdline.utils import decorators, echo
55
from aiida.cmdline.params import options
6+
from aiida.cmdline.commands.cmd_daemon import validate_daemon_workers
67
from aiida_workgraph.engine.scheduler.client import get_scheduler_client
78
import sys
89

@@ -40,17 +41,18 @@ def worker():
4041

4142
@scheduler.command()
4243
@click.option("--foreground", is_flag=True, help="Run in foreground.")
44+
@click.argument("number", required=False, type=int, callback=validate_daemon_workers)
4345
@options.TIMEOUT(default=None, required=False, type=int)
4446
@decorators.with_dbenv()
4547
@decorators.requires_broker
4648
@decorators.check_circus_zmq_version
47-
def start(foreground, timeout):
49+
def start(foreground, number, timeout):
4850
"""Start the scheduler application."""
4951

5052
click.echo("Starting the scheduler process...")
5153

5254
client = get_scheduler_client()
53-
client.start_daemon(foreground=foreground)
55+
client.start_daemon(number_workers=number, foreground=foreground)
5456

5557

5658
@scheduler.command()
@@ -86,18 +88,19 @@ def stop(ctx, no_wait, all_profiles, timeout):
8688

8789
@scheduler.command(hidden=True)
8890
@click.option("--foreground", is_flag=True, help="Run in foreground.")
91+
@click.argument("number", required=False, type=int, callback=validate_daemon_workers)
8992
@decorators.with_dbenv()
9093
@decorators.requires_broker
9194
@decorators.check_circus_zmq_version
92-
def start_circus(foreground):
95+
def start_circus(foreground, number):
9396
"""This will actually launch the circus daemon, either daemonized in the background or in the foreground.
9497
9598
If run in the foreground all logs are redirected to stdout.
9699
97100
.. note:: this should not be called directly from the commandline!
98101
"""
99102

100-
get_scheduler_client()._start_daemon(foreground=foreground)
103+
get_scheduler_client()._start_daemon(number_workers=number, foreground=foreground)
101104

102105

103106
@scheduler.command()

aiida_workgraph/engine/launch.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from aiida.common import InvalidOperation
77
from aiida.common.log import AIIDA_LOGGER
88
from aiida.manage import manager
9-
from aiida.orm import ProcessNode
9+
from aiida.orm import ProcessNode, load_node
1010

1111
from aiida.engine.processes.builder import ProcessBuilder
1212
from aiida.engine.processes.functions import get_stack_size
@@ -15,6 +15,7 @@
1515

1616
import signal
1717
import sys
18+
import os
1819
import inspect
1920
from typing import (
2021
Type,
@@ -271,7 +272,8 @@ def start_scheduler_worker(foreground: bool = False) -> None:
271272
for s in signals:
272273
# https://github.com/python/mypy/issues/12557
273274
runner.loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown_worker(runner))) # type: ignore[misc]
274-
275+
# get current process of this thread
276+
current_process = os.getpid()
275277
try:
276278
running_scheduler = get_scheduler()
277279
runner_loop = runner.loop
@@ -286,10 +288,13 @@ def start_scheduler_worker(foreground: bool = False) -> None:
286288
communicator=None, pid=running_scheduler, nowait=True
287289
)
288290
)
291+
process = load_node(running_scheduler)
292+
process.base.extras.set("daemon_pid", current_process)
293+
289294
except ValueError:
290295
process_inited = instantiate_process(runner, WorkGraphScheduler)
296+
process_inited.base.extras.set("daemon_pid", current_process)
291297
runner.loop.create_task(process_inited.step_until_terminated())
292-
293298
try:
294299
LOGGER.info("Starting a daemon worker")
295300
runner.start()

aiida_workgraph/engine/scheduler/client.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def filepaths(self):
5555
},
5656
},
5757
"daemon": {
58-
"log": str(DAEMON_LOG_DIR / f"aiida-{self.profile.name}.log"),
59-
"pid": str(DAEMON_DIR / f"aiida-{self.profile.name}.pid"),
58+
"log": str(DAEMON_LOG_DIR / f"aiida-scheduler-{self.profile.name}.log"),
59+
"pid": str(DAEMON_DIR / f"aiida-scheduler-{self.profile.name}.pid"),
6060
},
6161
}
6262

@@ -102,6 +102,7 @@ def cmd_start_daemon(
102102
self.profile.name,
103103
"scheduler",
104104
"start-circus",
105+
str(number_workers),
105106
]
106107

107108
if foreground:
@@ -114,7 +115,7 @@ def cmd_start_daemon_worker(self) -> list[str]:
114115
"""Return the command to start a daemon worker process."""
115116
return [self._workgraph_bin, "-p", self.profile.name, "scheduler", "worker"]
116117

117-
def _start_daemon(self, foreground: bool = False) -> None:
118+
def _start_daemon(self, number_workers: int = 1, foreground: bool = False) -> None:
118119
"""Start the daemon.
119120
120121
.. warning:: This will daemonize the current process and put it in the background. It is most likely not what
@@ -130,6 +131,11 @@ def _start_daemon(self, foreground: bool = False) -> None:
130131
from circus.pidfile import Pidfile
131132
from circus.util import check_future_exception_and_log, configure_logger
132133

134+
if foreground and number_workers > 1:
135+
raise ValueError(
136+
"can only run a single worker when running in the foreground"
137+
)
138+
133139
loglevel = self.loglevel
134140
logoutput = "-"
135141

@@ -149,7 +155,7 @@ def _start_daemon(self, foreground: bool = False) -> None:
149155
{
150156
"cmd": " ".join(self.cmd_start_daemon_worker),
151157
"name": self.daemon_name,
152-
"numprocesses": 1,
158+
"numprocesses": number_workers,
153159
"virtualenv": self.virtualenv,
154160
"copy_env": True,
155161
"stdout_stream": {

0 commit comments

Comments
 (0)