Skip to content

Commit

Permalink
Release 1.9.2 (#167)
Browse files Browse the repository at this point in the history
* Fix issue with git clone preventing jobs from running
  • Loading branch information
aef- authored Mar 25, 2021
1 parent 8b48917 commit 2a2885c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 61 deletions.
7 changes: 6 additions & 1 deletion submitter/jobsubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ def __init__(self, github, entrypoint, version='master'):
self.version = version

def resolve(self, location):
git.Git(location).clone(self.github, '--branch', self.version, '--recurse-submodules')
try:
git.Git(location).clone(self.github, '--branch', self.version, '--recurse-submodules')
except git.exc.GitCommandError as err:
if "already exists" not in err.stderr:
raise(err)

dirname = self._extract_dirname_from_github_link()
return os.path.join(location, dirname, self.entrypoint)

Expand Down
18 changes: 10 additions & 8 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from unittest import skip
from mock import patch
from django.test import TestCase
from toil_orchestrator.models import Job, Status
Expand Down Expand Up @@ -27,31 +28,31 @@ def test_failure_to_submit(self):
self.assertNotEqual(log_path, None)

@patch('submitter.jobsubmitter.JobSubmitter.__init__')
@patch('toil_orchestrator.tasks.save_job_info')
@patch('toil_orchestrator.tasks.submit_job_to_lsf')
@patch('submitter.jobsubmitter.JobSubmitter.submit')
def test_submit_polling(self, job_submitter, save_job_info, init):
def test_submit_polling(self, job_submitter, submit_job_to_lsf, init):
init.return_value = None
job_submitter.return_value = self.current_job.external_id, self.current_job.job_store_location, self.current_job.working_dir, self.current_job.output_directory
save_job_info.return_value = None
submit_job_to_lsf.return_value = None
created_jobs = len(Job.objects.filter(status=Status.CREATED))
running_jobs = len(Job.objects.filter(status__in=(Status.RUNNING, Status.PENDING)))
submit_pending_jobs()
self.assertEqual(save_job_info.call_count, created_jobs)
save_job_info.reset_mock()
self.assertEqual(submit_job_to_lsf.delay.call_count, created_jobs)
submit_job_to_lsf.reset_mock()
submit_pending_jobs()
self.assertEqual(save_job_info.call_count, 0)
self.assertEqual(submit_job_to_lsf.delay.call_count, 0)

@patch('submitter.jobsubmitter.JobSubmitter.__init__')
@patch('submitter.jobsubmitter.JobSubmitter.submit')
@patch('toil_orchestrator.tasks.save_job_info')
def test_submit(self, save_job_info, submit, init):
init.return_value = None
save_job_info.return_value = None
submit.return_value = self.current_job.external_id, self.current_job.job_store_location, self.current_job.working_dir, self.current_job.output_directory
submit.return_value = self.current_job.external_id, "/new/job_store_location", self.current_job.working_dir, self.current_job.output_directory
submit_job_to_lsf(self.current_job)
self.current_job.refresh_from_db()
self.assertEqual(self.current_job.status, Status.PENDING)
self.assertEqual(self.current_job.finished, None)
self.assertEqual(self.current_job.job_store_location, "/new/job_store_location")

@patch('toil_orchestrator.tasks.get_job_info_path')
@patch('submitter.jobsubmitter.JobSubmitter.__init__')
Expand Down Expand Up @@ -113,6 +114,7 @@ def test_running(self, status, init, get_job_info_path):

@patch('submitter.jobsubmitter.JobSubmitter.__init__')
@patch('submitter.jobsubmitter.JobSubmitter.status')
@skip("We are no longer failing tests on pending status, and instead letting the task fail it")
def test_fail_not_submitted(self, status, init):
init.return_value = None
status.return_value = Status.PENDING, None
Expand Down
72 changes: 45 additions & 27 deletions toil_orchestrator/admin.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,62 @@
from django.contrib import admin
from .models import Job, CommandLineToolJob
from .models import Job, CommandLineToolJob, Status
from toil_orchestrator.tasks import cleanup_folders
from django.contrib import messages


class StatusFilter(admin.SimpleListFilter):
title = 'Status'
parameter_name = 'status'

def lookups(self, request, model_admin):
filters = {k:v for (k, v) in request.GET.items() if "range" not in k and "status" not in k
and "q" not in k and "p" not in k}

qs = model_admin.get_queryset(request).filter(**filters)
return [(status.value, "%s (%s)" % (status.name, qs.filter(status=status.value).count())) for status in Status]

def queryset(self, request, queryset):
if self.value():
return queryset.filter(status=self.value())
return queryset


@admin.register(Job)
class JobAdmin(admin.ModelAdmin):
actions = ['cleanup_files']
report_message = """
Cleaning up {cleaning} job(s) [{partial_cleaning} partial]
Already cleaned up {cleaned_up}
"""
list_display = ("id", "status", "created_date", "modified_date", "external_id")
ordering = ('-created_date',)
list_filter = (StatusFilter,)

def cleanup_files(self, request, queryset):
cleaned_up_projects = 0
partially_cleaned_up_projects = 0
already_cleaned_up_projects = 0
for job in queryset:
actions = ['cleanup_files']

if all([job.job_store_clean_up, job.working_dir_clean_up]):
cleaned_up_projects = cleaned_up_projects + 1
elif any([job.job_store_clean_up, job.working_dir_clean_up]):
cleaned_up_projects = cleaned_up_projects + 1
partially_cleaned_up_projects = partially_cleaned_up_projects + 1
else:
already_cleaned_up_projects = already_cleaned_up_projects + 1
def cleanup_files(self, request, queryset):
cleaned_up_projects = 0
partially_cleaned_up_projects = 0
already_cleaned_up_projects = 0
report_message = """
Cleaning up {cleaning} job(s) [{partial_cleaning} partial]
Already cleaned up {cleaned_up}
"""
for job in queryset:

cleanup_folders.delay(str(job.id))
if all([job.job_store_clean_up, job.working_dir_clean_up]):
cleaned_up_projects = cleaned_up_projects + 1
elif any([job.job_store_clean_up, job.working_dir_clean_up]):
cleaned_up_projects = cleaned_up_projects + 1
partially_cleaned_up_projects = partially_cleaned_up_projects + 1
else:
already_cleaned_up_projects = already_cleaned_up_projects + 1

message = self.report_message.format(cleaning=cleaned_up_projects,
partial_cleaning=partially_cleaned_up_projects,
cleaned_up=already_cleaned_up_projects)
cleanup_folders.delay(str(job.id))

self.message_user(request, message, level=messages.WARNING)
message = report_message.format(cleaning=cleaned_up_projects,
partial_cleaning=partially_cleaned_up_projects,
cleaned_up=already_cleaned_up_projects)

cleanup_files.short_description = "Cleanup up the TOIL jobstore and workdir"
list_display = ("id", "status", "created_date", "modified_date", "external_id")
ordering = ('-created_date',)
self.message_user(request, message, level=messages.WARNING)

cleanup_files.short_description = "Cleanup up the TOIL jobstore and workdir"

@admin.register(CommandLineToolJob)
class CommandLineToolJobAdmin(admin.ModelAdmin):
list_display = ("id", "job_name", "status", "created_date", "modified_date", "started", "submitted", "finished")
list_display = ("id", "job_name", "status", "created_date", "modified_date", "started", "submitted", "finished")
3 changes: 2 additions & 1 deletion toil_orchestrator/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
app.autodiscover_tasks()


app.conf.task_routes = {'toil_orchestrator.tasks.cleanup_folders': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE},
app.conf.task_routes = {'toil_orchestrator.tasks.submit_job_to_lsf': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE},
'toil_orchestrator.tasks.cleanup_folders': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE},
'toil_orchestrator.tasks.abort_job': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE}}

app.conf.beat_schedule = {
Expand Down
47 changes: 23 additions & 24 deletions toil_orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def on_failure_to_submit(self, exc, task_id, args, kwargs, einfo):
update_message_by_key(job,'info','Failed to submit job')
job.finished = now()
job.save()
logger.error('Job Saved')


@shared_task
Expand All @@ -85,14 +84,22 @@ def submit_pending_jobs():
if jobs_to_submit <= 0:
return

jobs = Job.objects.filter(status=Status.CREATED).order_by("created_date")[:jobs_to_submit]
job_ids = Job.objects.filter(status=Status.CREATED).order_by("created_date").values_list('pk', flat=True)[:jobs_to_submit]

for job in jobs:
submit_job_to_lsf(job)
Job.objects.filter(pk__in=list(job_ids)).update(status=Status.PENDING)

for job_id in job_ids:
submit_job_to_lsf.delay(job_id)

def submit_job_to_lsf(job):
logger.info("Submitting job %s to lsf" % str(job.id))
@shared_task(bind=True,
autoretry_for=(Exception,),
retry_jitter=True,
retry_backoff=360,
retry_kwargs={"max_retries": 4},
on_failure=on_failure_to_submit)
def submit_job_to_lsf(self, job_id):
logger.info("Submitting job %s to lsf" % str(job_id))
job = Job.objects.get(pk=job_id)
submitter = JobSubmitter(str(job.id), job.app, job.inputs, job.root_dir, job.resume_job_store_location)
external_job_id, job_store_dir, job_work_dir, job_output_dir = submitter.submit()
logger.info("Job %s submitted to lsf with id: %s" % (str(job.id), external_job_id))
Expand All @@ -101,34 +108,20 @@ def submit_job_to_lsf(job):
job.job_store_location = job_store_dir
job.working_dir = job_work_dir
job.output_directory = job_output_dir
job.status = Status.PENDING
log_path = os.path.join(job_work_dir, 'lsf.log')
update_message_by_key(job, 'log', log_path)
job.save(update_fields=['external_id',
'job_store_location',
'working_dir',
'output_directory',
'status'])
])


@shared_task(bind=True, max_retries=10, retry_jitter=True, retry_backoff=60)
def abort_job(self, job_id):
logger.info("Abort job %s" % job_id)
job = Job.objects.get(id=job_id)
try:
logger.info("Submitting job %s to lsf" % job.id)
submitter = JobSubmitter(job_id, job.app, job.inputs, job.root_dir, job.resume_job_store_location)
external_job_id, job_store_dir, job_work_dir, job_output_dir = submitter.submit()
logger.info("Job %s submitted to lsf with id: %s" % (job_id, external_job_id))
save_job_info(job_id, external_job_id, job_store_dir, job_work_dir, job_output_dir)
job.external_id = external_job_id
job.job_store_location = job_store_dir
job.working_dir = job_work_dir
job.output_directory = job_output_dir
job.status = Status.PENDING
log_path = os.path.join(job_work_dir, 'lsf.log')
update_message_by_key(job, 'log', log_path)
job.save()
if job.status in (Status.PENDING, Status.RUNNING,):
submitter = JobSubmitter(job_id, job.app, job.inputs, job.root_dir, job.resume_job_store_location)
job_killed = submitter.abort(job.external_id)
Expand All @@ -139,11 +132,15 @@ def abort_job(self, job_id):
else:
logger.info("Failed to abort job %s" % job_id)
raise Exception("Failed to abort job %s" % job_id)
elif job.status in (Status.CREATED, Status.UNKNOWN,):
logger.info("Job aborting %s but still not submitted" % job_id)
elif job.status == Status.CREATED:
job.status = Status.ABORTED
job.save()
return
elif job.status == Status.UNKNOWN:
logger.info("Job aborting %s is unknown" % job_id)
raise Exception("Job aborting %s but still not submitted" % job_id)
else:
logger.info("Job %s already in final state %s")
logger.info("Job %s already in final state %s", job_id, job.status)
return
except Exception as e:
logger.info("Error happened %s. Retrying..." % str(e))
Expand Down Expand Up @@ -214,6 +211,8 @@ def check_status_of_jobs(self):
except Exception as e:
error_message = "Failed to update job %s from file: %s\n%s" % (job.id, job_info_path,str(e))
logger.info(error_message)
elif not job.external_id and job.status == Status.PENDING:
continue
elif job.external_id:
submiter = JobSubmitter(str(job.id), job.app, job.inputs, job.root_dir, job.resume_job_store_location)
lsf_status_info = submiter.status(job.external_id)
Expand Down

0 comments on commit 2a2885c

Please sign in to comment.