From e22b8314fc0be3a6c3cb893be5060d41b12c31c9 Mon Sep 17 00:00:00 2001 From: Thomas Mendoza Date: Wed, 16 Oct 2024 15:49:28 -0700 Subject: [PATCH 1/4] Implement basic FluxSpawner --- batchspawner/batchspawner.py | 48 +++++++++++++++++++++++++++++ batchspawner/tests/test_spawners.py | 35 +++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 866bf51..f907230 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -16,6 +16,7 @@ * job names instead of PIDs """ import asyncio +import json import os import pwd import re @@ -24,6 +25,7 @@ from jinja2 import Template from jupyterhub.spawner import Spawner, set_user_setuid +from urllib.parse import urlparse from traitlets import Dict, Float, Integer, Unicode, default @@ -960,4 +962,50 @@ def state_gethost(self): return +class FluxSpawner(BatchSpawnerBase): + """A Spawner that uses Flux to launch notebooks.""" + + batch_script = Unicode( + """#!/bin/sh + +#flux: --nslots=1 +#flux: --job-name='spawner-jupyterhub' +#flux: --output={{homedir}}/{% raw %}.flux-{{id}}-{{name}}.log{% endraw %} +#flux: --error={{homedir}}/{% raw %}.flux-{{id}}-{{name}}.error.log{% endraw %} +{% if runtime %}#flux: --time-limit={{runtime}} +{% endif %}{% if queue %}#flux: --queue={{queue}} +{% endif %}{% if nprocs %}#flux: --cores-per-slot={{nprocs}} +{% endif %}{% if gres %}#flux: --gpus-per-slot={{gres}}{% endif %} + +set -eu + +{{prologue}} +{{cmd}} +{{epilogue}} +""" + ).tag(config=True) + batch_submit_cmd = Unicode("flux batch").tag(config=True) + batch_query_cmd = Unicode("flux jobs --json {job_id}").tag(config=True) + batch_cancel_cmd = Unicode("flux cancel {job_id}").tag(config=True) + + def state_ispending(self): + if not self.job_status: + return False + + status = json.loads(self.job_status) + return status["state"] in ("DEPEND", "PRIORITY", "SCHED") + + + def state_isrunning(self): + if not self.job_status: + return False + + status = json.loads(self.job_status) + return status["state"] in ("RUN", "CLEANUP") + + def state_gethost(self): + status = json.loads(self.job_status) + return urlparse(status["uri"]).netloc + + # vim: set ai expandtab softtabstop=4: diff --git a/batchspawner/tests/test_spawners.py b/batchspawner/tests/test_spawners.py index e5d43c0..03b03d6 100644 --- a/batchspawner/tests/test_spawners.py +++ b/batchspawner/tests/test_spawners.py @@ -625,6 +625,41 @@ async def test_lfs(db, event_loop): ) +async def test_flux(db, event_loop): + spawner_kwargs = { + "req_nprocs": "5", + "req_gres": "5", + "req_queue": "some_queue", + "req_prologue": "PROLOGUE", + "req_epilogue": "EPILOGUE", + } + batch_script_re_list = [ + re.compile( + r"^PROLOGUE.*^batchspawner-singleuser singleuser_command.*^EPILOGUE", + re.S | re.M, + ), + re.compile(r"#flux:\s+--queue=some_queue", re.M), + ] + script = [ + (re.compile(r"sudo.*flux batch"), str(testjob)), + (re.compile(r"sudo.*flux jobs --json"), '{"state": "SCHED"}'), + (re.compile(r"sudo.*flux jobs --json"), f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}'), + (re.compile(r"sudo.*flux jobs --json"), f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}'), + (re.compile(r"sudo.*flux jobs --json"), f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}'), + (re.compile(r"sudo.*flux cancel"), ""), + (re.compile(r"sudo.*flux jobs --json"), '{"state": "INACTIVE"}'), + ] + from .. import FluxSpawner + + await run_spawner_script( + db, + FluxSpawner, + script, + batch_script_re_list=batch_script_re_list, + spawner_kwargs=spawner_kwargs, + ) + + async def test_keepvars(db, event_loop): # req_keepvars spawner_kwargs = { From d5064be7670af3beb41751fc08c05bab8c4665d6 Mon Sep 17 00:00:00 2001 From: Thomas Mendoza Date: Thu, 14 Nov 2024 10:31:37 -0800 Subject: [PATCH 2/4] Set cwd and quote outfile templates --- batchspawner/batchspawner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index f907230..d6aeff2 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -970,8 +970,9 @@ class FluxSpawner(BatchSpawnerBase): #flux: --nslots=1 #flux: --job-name='spawner-jupyterhub' -#flux: --output={{homedir}}/{% raw %}.flux-{{id}}-{{name}}.log{% endraw %} -#flux: --error={{homedir}}/{% raw %}.flux-{{id}}-{{name}}.error.log{% endraw %} +#flux: --cwd={{homedir}} +#flux: --output='{{homedir}}/{% raw %}.flux-{{id}}-{{name}}.log{% endraw %}' +#flux: --error='{{homedir}}/{% raw %}.flux-{{id}}-{{name}}.error.log{% endraw %}' {% if runtime %}#flux: --time-limit={{runtime}} {% endif %}{% if queue %}#flux: --queue={{queue}} {% endif %}{% if nprocs %}#flux: --cores-per-slot={{nprocs}} From 8d5120d5da76c6cbc399a05e57f1f673aabffd79 Mon Sep 17 00:00:00 2001 From: Thomas Mendoza Date: Thu, 14 Nov 2024 11:45:58 -0800 Subject: [PATCH 3/4] Remain in pending state until uri is available --- batchspawner/batchspawner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index d6aeff2..0b981d2 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -994,7 +994,7 @@ def state_ispending(self): return False status = json.loads(self.job_status) - return status["state"] in ("DEPEND", "PRIORITY", "SCHED") + return status["state"] in ("DEPEND", "PRIORITY", "SCHED") or "uri" not in status def state_isrunning(self): @@ -1002,7 +1002,7 @@ def state_isrunning(self): return False status = json.loads(self.job_status) - return status["state"] in ("RUN", "CLEANUP") + return status["state"] in ("RUN", "CLEANUP") and "uri" in status def state_gethost(self): status = json.loads(self.job_status) From 3cac99d8bc9a8060e1d821e65eeaee8ddf39492a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 14 Nov 2024 22:44:54 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- batchspawner/batchspawner.py | 3 +-- batchspawner/tests/test_spawners.py | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/batchspawner/batchspawner.py b/batchspawner/batchspawner.py index 0b981d2..5d86c56 100644 --- a/batchspawner/batchspawner.py +++ b/batchspawner/batchspawner.py @@ -22,10 +22,10 @@ import re import xml.etree.ElementTree as ET from enum import Enum +from urllib.parse import urlparse from jinja2 import Template from jupyterhub.spawner import Spawner, set_user_setuid -from urllib.parse import urlparse from traitlets import Dict, Float, Integer, Unicode, default @@ -996,7 +996,6 @@ def state_ispending(self): status = json.loads(self.job_status) return status["state"] in ("DEPEND", "PRIORITY", "SCHED") or "uri" not in status - def state_isrunning(self): if not self.job_status: return False diff --git a/batchspawner/tests/test_spawners.py b/batchspawner/tests/test_spawners.py index 03b03d6..3899555 100644 --- a/batchspawner/tests/test_spawners.py +++ b/batchspawner/tests/test_spawners.py @@ -643,9 +643,18 @@ async def test_flux(db, event_loop): script = [ (re.compile(r"sudo.*flux batch"), str(testjob)), (re.compile(r"sudo.*flux jobs --json"), '{"state": "SCHED"}'), - (re.compile(r"sudo.*flux jobs --json"), f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}'), - (re.compile(r"sudo.*flux jobs --json"), f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}'), - (re.compile(r"sudo.*flux jobs --json"), f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}'), + ( + re.compile(r"sudo.*flux jobs --json"), + f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}', + ), + ( + re.compile(r"sudo.*flux jobs --json"), + f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}', + ), + ( + re.compile(r"sudo.*flux jobs --json"), + f'{{"state": "RUN", "uri": "ssh://{testhost}/foo/bar"}}', + ), (re.compile(r"sudo.*flux cancel"), ""), (re.compile(r"sudo.*flux jobs --json"), '{"state": "INACTIVE"}'), ]