Skip to content

Commit d3e30de

Browse files
committed
Add SchedulerClient, use circus to start scheduler
1 parent 8669675 commit d3e30de

File tree

4 files changed

+342
-87
lines changed

4 files changed

+342
-87
lines changed

aiida_workgraph/cli/cmd_scheduler.py

Lines changed: 130 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
from aiida_workgraph.cli.cmd_workgraph import workgraph
2-
from aiida import orm
32
import click
4-
import os
53
from pathlib import Path
6-
from aiida.cmdline.utils import echo
7-
from .cmd_graph import REPAIR_INSTRUCTIONS
8-
4+
from aiida.cmdline.utils import decorators, echo
5+
from aiida.cmdline.params import options
6+
from aiida_workgraph.engine.scheduler.client import get_scheduler_client
7+
import sys
98

109
REACT_PORT = "3000"
1110

@@ -30,7 +29,7 @@ def scheduler():
3029

3130

3231
@scheduler.command()
33-
def start_worker():
32+
def worker():
3433
"""Start the scheduler application."""
3534
from aiida_workgraph.engine.launch import start_scheduler_worker
3635

@@ -40,96 +39,140 @@ def start_worker():
4039

4140

4241
@scheduler.command()
43-
def start():
42+
@click.option("--foreground", is_flag=True, help="Run in foreground.")
43+
@options.TIMEOUT(default=None, required=False, type=int)
44+
@decorators.with_dbenv()
45+
@decorators.requires_broker
46+
@decorators.check_circus_zmq_version
47+
def start(foreground, timeout):
4448
"""Start the scheduler application."""
45-
from aiida_workgraph.engine.scheduler import WorkGraphScheduler
46-
from aiida.engine import submit
4749

4850
click.echo("Starting the scheduler process...")
4951

50-
pid_file_path = get_pid_file_path()
51-
# if the PID file already exists, check if the process is running
52-
if pid_file_path.exists():
53-
with open(pid_file_path, "r") as pid_file:
54-
for line in pid_file:
55-
_, pid = line.strip().split(":")
56-
if pid:
57-
try:
58-
node = orm.load_node(pid)
59-
if node.is_sealed:
60-
click.echo(
61-
"PID file exists but no running scheduler process found."
62-
)
63-
else:
64-
click.echo(
65-
f"Scheduler process with PID {node.pk} already running."
66-
)
67-
return
68-
except Exception:
69-
click.echo(
70-
"PID file exists but no running scheduler process found."
71-
)
72-
73-
with open(pid_file_path, "w") as pid_file:
74-
node = submit(WorkGraphScheduler)
75-
pid_file.write(f"Scheduler:{node.pk}\n")
76-
click.echo(f"Scheduler process started with PID {node.pk}.")
52+
try:
53+
client = get_scheduler_client()
54+
client.start_daemon(foreground=foreground)
55+
except Exception as exception:
56+
echo.echo(f"Failed to start the scheduler process: {exception}")
7757

7858

7959
@scheduler.command()
80-
def stop():
81-
"""Stop the scheduler application."""
82-
from aiida.engine.processes import control
83-
84-
pid_file_path = get_pid_file_path()
85-
86-
if not pid_file_path.exists():
87-
click.echo("No running scheduler application found.")
88-
return
89-
90-
with open(pid_file_path, "r") as pid_file:
91-
for line in pid_file:
92-
_, pid = line.strip().split(":")
93-
if pid:
94-
click.confirm(
95-
"Are you sure you want to kill the scheduler process?", abort=True
96-
)
97-
process = orm.load_node(pid)
98-
try:
99-
message = "Killed through `verdi process kill`"
100-
control.kill_processes(
101-
[process],
102-
timeout=5,
103-
wait=True,
104-
message=message,
105-
)
106-
except control.ProcessTimeoutException as exception:
107-
echo.echo_critical(f"{exception}\n{REPAIR_INSTRUCTIONS}")
108-
os.remove(pid_file_path)
60+
@click.option("--no-wait", is_flag=True, help="Do not wait for confirmation.")
61+
@click.option("--all", "all_profiles", is_flag=True, help="Stop all daemons.")
62+
@options.TIMEOUT(default=None, required=False, type=int)
63+
@decorators.requires_broker
64+
@click.pass_context
65+
def stop(ctx, no_wait, all_profiles, timeout):
66+
"""Stop the daemon.
67+
68+
Returns exit code 0 if the daemon was shut down successfully (or was not running), non-zero if there was an error.
69+
"""
70+
if all_profiles is True:
71+
profiles = [
72+
profile
73+
for profile in ctx.obj.config.profiles
74+
if not profile.is_test_profile
75+
]
76+
else:
77+
profiles = [ctx.obj.profile]
78+
79+
for profile in profiles:
80+
echo.echo("Profile: ", fg=echo.COLORS["report"], bold=True, nl=False)
81+
echo.echo(f"{profile.name}", bold=True)
82+
echo.echo("Stopping the daemon... ", nl=False)
83+
try:
84+
client = get_scheduler_client()
85+
client.stop_daemon(wait=not no_wait, timeout=timeout)
86+
except Exception as exception:
87+
echo.echo_error(f"Failed to stop the daemon: {exception}")
88+
89+
90+
@scheduler.command(hidden=True)
91+
@click.option("--foreground", is_flag=True, help="Run in foreground.")
92+
@decorators.with_dbenv()
93+
@decorators.requires_broker
94+
@decorators.check_circus_zmq_version
95+
def start_circus(foreground):
96+
"""This will actually launch the circus daemon, either daemonized in the background or in the foreground.
97+
98+
If run in the foreground all logs are redirected to stdout.
99+
100+
.. note:: this should not be called directly from the commandline!
101+
"""
102+
103+
get_scheduler_client()._start_daemon(foreground=foreground)
109104

110105

111106
@scheduler.command()
112-
def status():
113-
"""Check the status of the scheduler application."""
114-
from aiida.orm import QueryBuilder
115-
from aiida_workgraph.engine.scheduler import WorkGraphScheduler
116-
117-
qb = QueryBuilder()
118-
projections = ["id"]
119-
filters = {
120-
"or": [
121-
{"attributes.sealed": False},
122-
{"attributes": {"!has_key": "sealed"}},
107+
@click.option("--all", "all_profiles", is_flag=True, help="Show status of all daemons.")
108+
@options.TIMEOUT(default=None, required=False, type=int)
109+
@click.pass_context
110+
@decorators.requires_loaded_profile()
111+
@decorators.requires_broker
112+
def status(ctx, all_profiles, timeout):
113+
"""Print the status of the current daemon or all daemons.
114+
115+
Returns exit code 0 if all requested daemons are running, else exit code 3.
116+
"""
117+
from tabulate import tabulate
118+
119+
from aiida.cmdline.utils.common import format_local_time
120+
from aiida.engine.daemon.client import DaemonException
121+
122+
if all_profiles is True:
123+
profiles = [
124+
profile
125+
for profile in ctx.obj.config.profiles
126+
if not profile.is_test_profile
123127
]
124-
}
125-
qb.append(
126-
WorkGraphScheduler,
127-
filters=filters,
128-
project=projections,
129-
tag="process",
130-
)
131-
results = qb.all()
132-
if len(results) == 0:
133-
click.echo("No scheduler found. Please start the scheduler first.")
134128
else:
135-
click.echo(f"Scheduler process is running with PID: {results[0][0]}")
129+
profiles = [ctx.obj.profile]
130+
131+
daemons_running = []
132+
133+
for profile in profiles:
134+
client = get_scheduler_client(profile.name)
135+
echo.echo("Profile: ", fg=echo.COLORS["report"], bold=True, nl=False)
136+
echo.echo(f"{profile.name}", bold=True)
137+
138+
try:
139+
client.get_status(timeout=timeout)
140+
except DaemonException as exception:
141+
echo.echo_error(str(exception))
142+
daemons_running.append(False)
143+
continue
144+
145+
worker_response = client.get_worker_info()
146+
daemon_response = client.get_daemon_info()
147+
148+
workers = []
149+
for pid, info in worker_response["info"].items():
150+
if isinstance(info, dict):
151+
row = [
152+
pid,
153+
info["mem"],
154+
info["cpu"],
155+
format_local_time(info["create_time"]),
156+
]
157+
else:
158+
row = [pid, "-", "-", "-"]
159+
workers.append(row)
160+
161+
if workers:
162+
workers_info = tabulate(
163+
workers, headers=["PID", "MEM %", "CPU %", "started"], tablefmt="simple"
164+
)
165+
else:
166+
workers_info = (
167+
"--> No workers are running. Use `verdi daemon incr` to start some!\n"
168+
)
169+
170+
start_time = format_local_time(daemon_response["info"]["create_time"])
171+
echo.echo(
172+
f'Daemon is running as PID {daemon_response["info"]["pid"]} since {start_time}\n'
173+
f"Active workers [{len(workers)}]:\n{workers_info}\n"
174+
"Use `verdi daemon [incr | decr] [num]` to increase / decrease the number of workers"
175+
)
176+
177+
if not all(daemons_running):
178+
sys.exit(3)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .scheduler import WorkGraphScheduler
2+
3+
__all__ = ("WorkGraphScheduler",)

0 commit comments

Comments
 (0)