Skip to content

Commit 6d6770e

Browse files
Merge pull request #37 from CAAI/dev/v1.3
fix queue ordering bug, changed autodeletion to be performed on jobs that have been finished for at least 10 minutes
2 parents 5d6020a + 1ed8968 commit 6d6770e

File tree

5 files changed

+80
-18
lines changed

5 files changed

+80
-18
lines changed

nodes/manager/manager.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from dotenv import load_dotenv
1111
from rhnode.version import __version__
1212
import time
13-
from pydantic import BaseModel, validator
13+
from pydantic import BaseModel, validator, root_validator
1414

1515
# load env variables from .env file if it exists
1616
load_dotenv()
@@ -22,9 +22,18 @@ class Job(BaseModel):
2222
required_gpu_mem: int
2323
required_memory: int
2424
required_threads: int
25-
creation_time: float = time.time()
25+
creation_time: float = None
2626
gpu_device_id: int = None
2727

28+
class Config:
29+
validate_assignment = True
30+
31+
@root_validator
32+
def number_validator(cls, values):
33+
if values["creation_time"] is None:
34+
values["creation_time"] = time.time()
35+
return values
36+
2837
@validator("priority")
2938
def check_priority(cls, v):
3039
if v < 1 or v > 5:
@@ -39,8 +48,8 @@ def __init__(self, job):
3948
def __repr__(self):
4049
return f"Queue item: {self.job}"
4150

42-
def __lt__(self, other: Job):
43-
self_args = (-self.job.priority, other.job.creation_time)
51+
def __lt__(self, other):
52+
self_args = (-self.job.priority, self.job.creation_time)
4453
other_args = (-other.job.priority, other.job.creation_time)
4554

4655
for self_arg, other_arg in zip(self_args, other_args):

rhnode/rhnode.py

+21-14
Original file line numberDiff line numberDiff line change
@@ -76,24 +76,35 @@ def __init__(self):
7676
setup_frontend_routes(self)
7777

7878
def _delete_job(self, job_id):
79+
print("DELETING", job_id)
7980
job = self.jobs[job_id]
8081
job.delete_files()
8182
del self.jobs[job_id]
8283

83-
def _get_expired_job_ids(self, max_age_hours=8):
84-
"""Get a list of job IDs that have been in the queue for longer than max_age_hours"""
84+
def _get_expired_job_ids(self, max_finished_mins=10):
85+
"""Get a list of job IDs that have been finished for longer than max_finished_mins"""
8586
expired_job_ids = []
87+
current_time = time.time()
8688
for job_id, job in self.jobs.items():
87-
if (time.time() - job.time_created) / 3600 > max_age_hours:
88-
if job.status in [
89-
JobStatus.Finished,
90-
JobStatus.Error,
91-
JobStatus.Cancelled,
92-
]:
93-
expired_job_ids.append(job_id)
89+
if job.time_finished is not None:
90+
if (current_time - job.time_finished) / 60 > max_finished_mins:
91+
if job.status in [
92+
JobStatus.Finished,
93+
JobStatus.Error,
94+
JobStatus.Cancelled,
95+
]:
96+
expired_job_ids.append(job_id)
9497

9598
return expired_job_ids
9699

100+
def _run_cleanup(self):
101+
print("Checking for expired jobs...")
102+
jobs = self._get_expired_job_ids()
103+
104+
for job_id in jobs:
105+
print("Deleting job", job_id)
106+
self._delete_job(job_id)
107+
97108
async def _delete_expired_jobs_loop(
98109
self, hour: int = 3, minute: int = 30, second: int = 0
99110
):
@@ -113,11 +124,7 @@ async def _delete_expired_jobs_loop(
113124
delay = (next_run - now).total_seconds()
114125
print("Next check in", delay, "seconds")
115126
await asyncio.sleep(delay)
116-
print("Checking for expired jobs...")
117-
jobs = self._get_expired_job_ids()
118-
for job_id in jobs:
119-
print("Deleting job", job_id)
120-
self._delete_job(job_id)
127+
self._run_cleanup()
121128

122129
def get_job_by_id(self, job_id: str):
123130
try:

rhnode/routing/backend.py

+8
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,14 @@ async def _parse_cli_args(cli: list):
116116
async def _get_job_input():
117117
return rhnode.help_cli_args()
118118

119+
@rhnode.get(rhnode._create_url("/clean_jobs"))
120+
async def _clean_jobs():
121+
if rhnode.rhnode_mode == "dev":
122+
rhnode._run_cleanup()
123+
return "ok"
124+
else:
125+
return "not in dev mode " + str(rhnode.rhnode_mode)
126+
119127
@rhnode.post(rhnode._create_url("/jobs/{job_id}/upload"))
120128
async def _upload(
121129
job_id: str,

tests/atest_multijob.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# %%
2+
3+
## Not to be run via pytest as of now
4+
import glob
5+
import os
6+
7+
from rhnode import RHJob, MultiJobRunner
8+
import os
9+
import json
10+
import datetime
11+
import nibabel as nib
12+
from pathlib import Path
13+
import re
14+
15+
ROOT = "/depict/data/quadra_fdg"
16+
17+
jobs = []
18+
19+
for i in range(500):
20+
inputs = {
21+
"scalar": 2,
22+
"in_file": "/homes/hinge/Projects/rh-node/tests/data/mr.nii.gz",
23+
"out_file": "out.nii.gz",
24+
}
25+
job = RHJob(
26+
node_name="add",
27+
manager_address="localhost:9050",
28+
inputs=inputs,
29+
check_cache=False,
30+
)
31+
32+
jobs.append(job)
33+
34+
job_runner = MultiJobRunner(jobs, queue_length=8)
35+
job_runner.start()
36+
37+
# %%

tests/docker-compose.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ services:
5959
- "traefik.http.routers.add.rule=PathPrefix(`/add`)"
6060
environment:
6161
RH_EMAIL_ON_ERROR: [email protected]
62+
RH_MODE: dev
6263
TZ: "Europe/Copenhagen"
6364

6465
## Testnode: OutputDirectory

0 commit comments

Comments
 (0)