Skip to content

Commit

Permalink
Merge pull request #156 from mskcc/develop
Browse files Browse the repository at this point in the history
Merging develop to master, queued for release
  • Loading branch information
allanbolipata authored Mar 10, 2021
2 parents 94513f1 + 6ab6104 commit 424ba66
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 39 deletions.
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ install: conda toil
anaconda::postgresql=11.2 \
conda-forge::ncurses \
rabbitmq-server=3.7.16 && \
pip install -r requirements.txt
pip install -r requirements-toil.txt && \
pip3 install -r requirements.txt
pip3 install -r requirements-toil.txt && \
cd toil && \
pip install -e .[cwl]
pip3 install -e .[cwl]

# Ridgeback environment variables for configuration
export RIDGEBACK_PATH:=$(CURDIR)
Expand Down Expand Up @@ -301,6 +301,8 @@ bash:
jobs:
curl "http://$(DJANGO_RIDGEBACK_IP):$(DJANGO_RIDGEBACK_PORT)/v0/jobs/"

test:
python3 manage.py test --verbosity=2
# submit a sample job to Ridgeback
demo-submit:
curl -H "Content-Type: application/json" \
Expand Down
3 changes: 1 addition & 2 deletions requirements-toil.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mock==1.0.1
mock==4.0.2
pytest==4.3.1
pytest-cov==2.6.1
stubserver==1.0.1
pytest-timeout==1.3.3
cwltest
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
Django==2.2.13
psycopg2==2.8.6
djangorestframework==3.9.4
markdown==3.1.1
django-filter==2.1.0
Expand Down
2 changes: 1 addition & 1 deletion ridgeback/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__="1.8.0"
__version__="1.9.0"
1 change: 1 addition & 0 deletions ridgeback/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

MAX_RUNNING_JOBS = int(os.environ.get('MAX_RUNNING_JOBS', 100))

# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/2.2/howto/deployment/checklist/
Expand Down
5 changes: 1 addition & 4 deletions submitter/jobsubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ def _prepare_directories(self):

def _job_args(self):
if "access" in self.app.github.lower():
if self.app.entrypoint == "workflows/ACCESS_pipeline.cwl":
return ["-W", "7200", "-M", "10"]
else:
return ["-W", "360", "-M", "5"]
return ["-W", "7200", "-M", "10"]
elif settings.LSF_WALLTIME:
return ['-W', settings.LSF_WALLTIME]
return []
Expand Down
8 changes: 2 additions & 6 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ def test_404_read(self):
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

@patch('toil_orchestrator.tasks.submit_jobs_to_lsf.delay')
def test_create(self, submit_jobs_mock):
def test_create(self):
url = self.api_root + 'jobs/'
submit_jobs_mock.return_value = None
data = {
'app': self.example_job.app,
'root_dir': self.example_job.root_dir,
Expand All @@ -66,10 +64,8 @@ def test_delete_authorized(self):
response = self.client.delete(url)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)

@patch('toil_orchestrator.tasks.submit_jobs_to_lsf.delay')
def test_resume(self, submit_jobs_mock):
def test_resume(self):
url = '{}jobs/{}/resume/'.format(self.api_root, self.example_job.id)
submit_jobs_mock.return_value = None
data = {
'root_dir': self.example_job.root_dir
}
Expand Down
27 changes: 24 additions & 3 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from mock import patch
from django.test import TestCase
from toil_orchestrator.models import Job, Status
from toil_orchestrator.tasks import submit_jobs_to_lsf, check_status_of_jobs, on_failure_to_submit, get_message, cleanup_completed_jobs, cleanup_failed_jobs
from toil_orchestrator.tasks import submit_job_to_lsf, submit_pending_jobs, check_status_of_jobs, on_failure_to_submit, get_message, cleanup_completed_jobs, cleanup_failed_jobs
from datetime import datetime, timedelta
from mock import patch, call


MAX_RUNNING_JOBS = 3
@patch('toil_orchestrator.tasks.MAX_RUNNING_JOBS', MAX_RUNNING_JOBS)
class TestTasks(TestCase):
fixtures = [
"toil_orchestrator.job.json"
Expand All @@ -24,14 +26,29 @@ def test_failure_to_submit(self):
self.assertEqual(info_message, 'Failed to submit job')
self.assertNotEqual(log_path, None)

@patch('submitter.jobsubmitter.JobSubmitter.__init__')
@patch('toil_orchestrator.tasks.save_job_info')
@patch('submitter.jobsubmitter.JobSubmitter.submit')
def test_submit_polling(self, job_submitter, save_job_info, init):
init.return_value = None
job_submitter.return_value = self.current_job.external_id, self.current_job.job_store_location, self.current_job.working_dir, self.current_job.output_directory
save_job_info.return_value = None
created_jobs = len(Job.objects.filter(status=Status.CREATED))
running_jobs = len(Job.objects.filter(status__in=(Status.RUNNING, Status.PENDING)))
submit_pending_jobs()
self.assertEqual(save_job_info.call_count, created_jobs)
save_job_info.reset_mock()
submit_pending_jobs()
self.assertEqual(save_job_info.call_count, 0)

@patch('submitter.jobsubmitter.JobSubmitter.__init__')
@patch('submitter.jobsubmitter.JobSubmitter.submit')
@patch('toil_orchestrator.tasks.save_job_info')
def test_submit(self, save_job_info, submit, init):
init.return_value = None
save_job_info.return_value = None
submit.return_value = self.current_job.external_id, self.current_job.job_store_location, self.current_job.working_dir, self.current_job.output_directory
submit_jobs_to_lsf(self.current_job.id)
submit_job_to_lsf(self.current_job)
self.current_job.refresh_from_db()
self.assertEqual(self.current_job.status, Status.PENDING)
self.assertEqual(self.current_job.finished, None)
Expand Down Expand Up @@ -94,7 +111,11 @@ def test_running(self, status, init, get_job_info_path):
self.assertNotEqual(self.current_job.started, None)
self.assertEqual(self.current_job.finished, None)

def test_fail_not_submitted(self):
@patch('submitter.jobsubmitter.JobSubmitter.__init__')
@patch('submitter.jobsubmitter.JobSubmitter.status')
def test_fail_not_submitted(self, status, init):
init.return_value = None
status.return_value = Status.PENDING, None
self.current_job.status = Status.PENDING
self.current_job.external_id = None
self.current_job.save()
Expand Down
8 changes: 6 additions & 2 deletions toil_orchestrator/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
app.autodiscover_tasks()


app.conf.task_routes = {'toil_orchestrator.tasks.submit_jobs_to_lsf': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE},
'toil_orchestrator.tasks.cleanup_folders': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE},
app.conf.task_routes = {'toil_orchestrator.tasks.cleanup_folders': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE},
'toil_orchestrator.tasks.abort_job': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE}}

app.conf.beat_schedule = {
Expand All @@ -29,6 +28,11 @@
"schedule": 60.0,
"options": {"queue": settings.RIDGEBACK_DEFAULT_QUEUE}
},
"submit_pending_jobs": {
"task": "toil_orchestrator.tasks.submit_pending_jobs",
"schedule": 60.0 * 5,
"options": {"queue": settings.RIDGEBACK_DEFAULT_QUEUE}
},
"check_status_of_command_line_jobs": {
"task": "toil_orchestrator.tasks.check_status_of_command_line_jobs",
"schedule": 10.0,
Expand Down
87 changes: 86 additions & 1 deletion toil_orchestrator/fixtures/toil_orchestrator.job.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,26 @@
"output_directory": "/sample/output/dir"
}
},
{
"model": "toil_orchestrator.basemodel",
"pk": "1cdbb924-9078-400f-b4b9-4bef1ea5ce1b",
"fields": {
"created_date": "2020-06-23T18:55:33.363Z",
"modified_date": "2020-06-23T18:55:33.363Z",
"output_directory": "/sample/output/dir"
}
},

{
"model": "toil_orchestrator.basemodel",
"pk": "1cdbb924-9078-400f-b4b9-4bef1ea5ce1c",
"fields": {
"created_date": "2020-06-23T18:55:33.363Z",
"modified_date": "2020-06-23T18:55:33.363Z",
"output_directory": "/sample/output/dir"
}
},

{
"model": "toil_orchestrator.basemodel",
"pk": "2d7aaabd-9fa1-4a60-9eda-67681fe7da49",
Expand Down Expand Up @@ -80,6 +100,71 @@
"output_directory": "/sample/output/dir"
}
},
{
"model": "toil_orchestrator.job",
"pk": "1cdbb924-9078-400f-b4b9-4bef1ea5ce1c",
"fields": {
"app": {
"github": {
"entrypoint": "tempo.cwl",
"repository": "https://github.com/mskcc-cwl/tempo"
}
},
"external_id": "1234",
"root_dir": "/sample/root/dir",
"job_store_location": "/sample/job_store/dir",
"resume_job_store_location": null,
"working_dir": "/sample/work/dir",
"status": 2,
"message": {
"log": "",
"info": "",
"failed_jobs": {},
"unknown_jobs": {}
},
"inputs": null,
"outputs": null,
"job_store_clean_up": null,
"working_dir_clean_up": null,
"started": "2020-06-23T18:56:18.067Z",
"submitted": "2020-06-23T18:56:18.067Z",
"finished": null,
"track_cache": null
}
},

{
"model": "toil_orchestrator.job",
"pk": "1cdbb924-9078-400f-b4b9-4bef1ea5ce1b",
"fields": {
"app": {
"github": {
"entrypoint": "tempo.cwl",
"repository": "https://github.com/mskcc-cwl/tempo"
}
},
"external_id": "1234",
"root_dir": "/sample/root/dir",
"job_store_location": "/sample/job_store/dir",
"resume_job_store_location": null,
"working_dir": "/sample/work/dir",
"status": 1,
"message": {
"log": "",
"info": "",
"failed_jobs": {},
"unknown_jobs": {}
},
"inputs": null,
"outputs": null,
"job_store_clean_up": null,
"working_dir_clean_up": null,
"started": "2020-06-23T18:56:18.067Z",
"submitted": "2020-06-23T18:56:18.067Z",
"finished": null,
"track_cache": null
}
},
{
"model": "toil_orchestrator.job",
"pk": "1cdbb924-9078-400f-b4b9-4bef1ea5ce1a",
Expand Down Expand Up @@ -224,4 +309,4 @@
"details": null
}
}
]
]
33 changes: 19 additions & 14 deletions toil_orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .toil_track_utils import ToilTrack
from .models import Job, Status, CommandLineToolJob
import shutil
from ridgeback.settings import MAX_RUNNING_JOBS


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -77,21 +78,25 @@ def on_failure_to_submit(self, exc, task_id, args, kwargs, einfo):
logger.error('Job Saved')


# Retry is 6 to 48 minutes with addee randomness from jittering
@shared_task(bind=True,
autoretry_for=(Exception,),
retry_jitter=True,
retry_backoff=360,
retry_kwargs={"max_retries": 4},
on_failure=on_failure_to_submit)
def submit_jobs_to_lsf(self, job_id):
logger.info("Submitting jobs to lsf")
job = Job.objects.get(id=job_id)
logger.info("Submitting job %s to lsf" % job.id)
submitter = JobSubmitter(job_id, job.app, job.inputs, job.root_dir, job.resume_job_store_location)
@shared_task
def submit_pending_jobs():
jobs_running = len(Job.objects.filter(status__in=(Status.RUNNING, Status.PENDING)))
jobs_to_submit = MAX_RUNNING_JOBS - jobs_running
if jobs_to_submit <= 0:
return

jobs = Job.objects.filter(status=Status.CREATED).order_by("created_date")[:jobs_to_submit]

for job in jobs:
submit_job_to_lsf(job)


def submit_job_to_lsf(job):
logger.info("Submitting job %s to lsf" % str(job.id))
submitter = JobSubmitter(job.id, job.app, job.inputs, job.root_dir, job.resume_job_store_location)
external_job_id, job_store_dir, job_work_dir, job_output_dir = submitter.submit()
logger.info("Job %s submitted to lsf with id: %s" % (job_id, external_job_id))
save_job_info(job_id, external_job_id, job_store_dir, job_work_dir, job_output_dir)
logger.info("Job %s submitted to lsf with id: %s" % (job.id, external_job_id))
save_job_info(job.id, external_job_id, job_store_dir, job_work_dir, job_output_dir)
job.external_id = external_job_id
job.job_store_location = job_store_dir
job.working_dir = job_work_dir
Expand Down
3 changes: 1 addition & 2 deletions toil_orchestrator/views/job_view.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from toil_orchestrator.models import Job, Status
from toil_orchestrator.serializers import JobSerializer, JobSubmitSerializer, JobResumeSerializer, JobIdsSerializer, JobStatusSerializer
from toil_orchestrator.tasks import submit_jobs_to_lsf, abort_job
from toil_orchestrator.tasks import abort_job
from rest_framework import mixins
from rest_framework import status
from rest_framework.viewsets import GenericViewSet
Expand All @@ -24,7 +24,6 @@ def validate_and_save(self, data):
serializer = JobSerializer(data=data)
if serializer.is_valid():
response = serializer.save()
submit_jobs_to_lsf.delay(str(response.id))
response = JobSerializer(response)
return Response(response.data, status=status.HTTP_201_CREATED)
else:
Expand Down

0 comments on commit 424ba66

Please sign in to comment.