diff --git a/submitter/jobsubmitter.py b/submitter/jobsubmitter.py index 5e77264e..f0d751fe 100644 --- a/submitter/jobsubmitter.py +++ b/submitter/jobsubmitter.py @@ -39,7 +39,12 @@ def __init__(self, github, entrypoint, version='master'): self.version = version def resolve(self, location): - git.Git(location).clone(self.github, '--branch', self.version, '--recurse-submodules') + try: + git.Git(location).clone(self.github, '--branch', self.version, '--recurse-submodules') + except git.exc.GitCommandError as err: + if "already exists" not in err.stderr: + raise(err) + dirname = self._extract_dirname_from_github_link() return os.path.join(location, dirname, self.entrypoint) diff --git a/tests/test_tasks.py b/tests/test_tasks.py index a77d2257..6520cd46 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -1,3 +1,4 @@ +from unittest import skip from mock import patch from django.test import TestCase from toil_orchestrator.models import Job, Status @@ -27,19 +28,19 @@ def test_failure_to_submit(self): self.assertNotEqual(log_path, None) @patch('submitter.jobsubmitter.JobSubmitter.__init__') - @patch('toil_orchestrator.tasks.save_job_info') + @patch('toil_orchestrator.tasks.submit_job_to_lsf') @patch('submitter.jobsubmitter.JobSubmitter.submit') - def test_submit_polling(self, job_submitter, save_job_info, init): + def test_submit_polling(self, job_submitter, submit_job_to_lsf, init): init.return_value = None job_submitter.return_value = self.current_job.external_id, self.current_job.job_store_location, self.current_job.working_dir, self.current_job.output_directory - save_job_info.return_value = None + submit_job_to_lsf.return_value = None created_jobs = len(Job.objects.filter(status=Status.CREATED)) running_jobs = len(Job.objects.filter(status__in=(Status.RUNNING, Status.PENDING))) submit_pending_jobs() - self.assertEqual(save_job_info.call_count, created_jobs) - save_job_info.reset_mock() + self.assertEqual(submit_job_to_lsf.delay.call_count, created_jobs) + submit_job_to_lsf.reset_mock() submit_pending_jobs() - self.assertEqual(save_job_info.call_count, 0) + self.assertEqual(submit_job_to_lsf.delay.call_count, 0) @patch('submitter.jobsubmitter.JobSubmitter.__init__') @patch('submitter.jobsubmitter.JobSubmitter.submit') @@ -47,11 +48,11 @@ def test_submit_polling(self, job_submitter, save_job_info, init): def test_submit(self, save_job_info, submit, init): init.return_value = None save_job_info.return_value = None - submit.return_value = self.current_job.external_id, self.current_job.job_store_location, self.current_job.working_dir, self.current_job.output_directory + submit.return_value = self.current_job.external_id, "/new/job_store_location", self.current_job.working_dir, self.current_job.output_directory submit_job_to_lsf(self.current_job) self.current_job.refresh_from_db() - self.assertEqual(self.current_job.status, Status.PENDING) self.assertEqual(self.current_job.finished, None) + self.assertEqual(self.current_job.job_store_location, "/new/job_store_location") @patch('toil_orchestrator.tasks.get_job_info_path') @patch('submitter.jobsubmitter.JobSubmitter.__init__') @@ -113,6 +114,7 @@ def test_running(self, status, init, get_job_info_path): @patch('submitter.jobsubmitter.JobSubmitter.__init__') @patch('submitter.jobsubmitter.JobSubmitter.status') + @skip("We are no longer failing tests on pending status, and instead letting the task fail it") def test_fail_not_submitted(self, status, init): init.return_value = None status.return_value = Status.PENDING, None diff --git a/toil_orchestrator/admin.py b/toil_orchestrator/admin.py index 6f05d518..0ed0bead 100644 --- a/toil_orchestrator/admin.py +++ b/toil_orchestrator/admin.py @@ -1,44 +1,62 @@ from django.contrib import admin -from .models import Job, CommandLineToolJob +from .models import Job, CommandLineToolJob, Status from toil_orchestrator.tasks import cleanup_folders from django.contrib import messages +class StatusFilter(admin.SimpleListFilter): + title = 'Status' + parameter_name = 'status' + + def lookups(self, request, model_admin): + filters = {k:v for (k, v) in request.GET.items() if "range" not in k and "status" not in k + and "q" not in k and "p" not in k} + + qs = model_admin.get_queryset(request).filter(**filters) + return [(status.value, "%s (%s)" % (status.name, qs.filter(status=status.value).count())) for status in Status] + + def queryset(self, request, queryset): + if self.value(): + return queryset.filter(status=self.value()) + return queryset + + @admin.register(Job) class JobAdmin(admin.ModelAdmin): - actions = ['cleanup_files'] - report_message = """ - Cleaning up {cleaning} job(s) [{partial_cleaning} partial] - Already cleaned up {cleaned_up} - """ + list_display = ("id", "status", "created_date", "modified_date", "external_id") + ordering = ('-created_date',) + list_filter = (StatusFilter,) - def cleanup_files(self, request, queryset): - cleaned_up_projects = 0 - partially_cleaned_up_projects = 0 - already_cleaned_up_projects = 0 - for job in queryset: + actions = ['cleanup_files'] - if all([job.job_store_clean_up, job.working_dir_clean_up]): - cleaned_up_projects = cleaned_up_projects + 1 - elif any([job.job_store_clean_up, job.working_dir_clean_up]): - cleaned_up_projects = cleaned_up_projects + 1 - partially_cleaned_up_projects = partially_cleaned_up_projects + 1 - else: - already_cleaned_up_projects = already_cleaned_up_projects + 1 + def cleanup_files(self, request, queryset): + cleaned_up_projects = 0 + partially_cleaned_up_projects = 0 + already_cleaned_up_projects = 0 + report_message = """ +Cleaning up {cleaning} job(s) [{partial_cleaning} partial] +Already cleaned up {cleaned_up} + """ + for job in queryset: - cleanup_folders.delay(str(job.id)) + if all([job.job_store_clean_up, job.working_dir_clean_up]): + cleaned_up_projects = cleaned_up_projects + 1 + elif any([job.job_store_clean_up, job.working_dir_clean_up]): + cleaned_up_projects = cleaned_up_projects + 1 + partially_cleaned_up_projects = partially_cleaned_up_projects + 1 + else: + already_cleaned_up_projects = already_cleaned_up_projects + 1 - message = self.report_message.format(cleaning=cleaned_up_projects, - partial_cleaning=partially_cleaned_up_projects, - cleaned_up=already_cleaned_up_projects) + cleanup_folders.delay(str(job.id)) - self.message_user(request, message, level=messages.WARNING) + message = report_message.format(cleaning=cleaned_up_projects, + partial_cleaning=partially_cleaned_up_projects, + cleaned_up=already_cleaned_up_projects) - cleanup_files.short_description = "Cleanup up the TOIL jobstore and workdir" - list_display = ("id", "status", "created_date", "modified_date", "external_id") - ordering = ('-created_date',) + self.message_user(request, message, level=messages.WARNING) + cleanup_files.short_description = "Cleanup up the TOIL jobstore and workdir" @admin.register(CommandLineToolJob) class CommandLineToolJobAdmin(admin.ModelAdmin): - list_display = ("id", "job_name", "status", "created_date", "modified_date", "started", "submitted", "finished") + list_display = ("id", "job_name", "status", "created_date", "modified_date", "started", "submitted", "finished") diff --git a/toil_orchestrator/celery.py b/toil_orchestrator/celery.py index c8e808c6..b44e78de 100644 --- a/toil_orchestrator/celery.py +++ b/toil_orchestrator/celery.py @@ -19,7 +19,8 @@ app.autodiscover_tasks() -app.conf.task_routes = {'toil_orchestrator.tasks.cleanup_folders': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE}, +app.conf.task_routes = {'toil_orchestrator.tasks.submit_job_to_lsf': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE}, + 'toil_orchestrator.tasks.cleanup_folders': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE}, 'toil_orchestrator.tasks.abort_job': {'queue': settings.RIDGEBACK_DEFAULT_QUEUE}} app.conf.beat_schedule = { diff --git a/toil_orchestrator/tasks.py b/toil_orchestrator/tasks.py index 5b15e843..af458ce5 100644 --- a/toil_orchestrator/tasks.py +++ b/toil_orchestrator/tasks.py @@ -75,7 +75,6 @@ def on_failure_to_submit(self, exc, task_id, args, kwargs, einfo): update_message_by_key(job,'info','Failed to submit job') job.finished = now() job.save() - logger.error('Job Saved') @shared_task @@ -85,14 +84,22 @@ def submit_pending_jobs(): if jobs_to_submit <= 0: return - jobs = Job.objects.filter(status=Status.CREATED).order_by("created_date")[:jobs_to_submit] + job_ids = Job.objects.filter(status=Status.CREATED).order_by("created_date").values_list('pk', flat=True)[:jobs_to_submit] - for job in jobs: - submit_job_to_lsf(job) + Job.objects.filter(pk__in=list(job_ids)).update(status=Status.PENDING) + for job_id in job_ids: + submit_job_to_lsf.delay(job_id) -def submit_job_to_lsf(job): - logger.info("Submitting job %s to lsf" % str(job.id)) +@shared_task(bind=True, + autoretry_for=(Exception,), + retry_jitter=True, + retry_backoff=360, + retry_kwargs={"max_retries": 4}, + on_failure=on_failure_to_submit) +def submit_job_to_lsf(self, job_id): + logger.info("Submitting job %s to lsf" % str(job_id)) + job = Job.objects.get(pk=job_id) submitter = JobSubmitter(str(job.id), job.app, job.inputs, job.root_dir, job.resume_job_store_location) external_job_id, job_store_dir, job_work_dir, job_output_dir = submitter.submit() logger.info("Job %s submitted to lsf with id: %s" % (str(job.id), external_job_id)) @@ -101,14 +108,13 @@ def submit_job_to_lsf(job): job.job_store_location = job_store_dir job.working_dir = job_work_dir job.output_directory = job_output_dir - job.status = Status.PENDING log_path = os.path.join(job_work_dir, 'lsf.log') update_message_by_key(job, 'log', log_path) job.save(update_fields=['external_id', 'job_store_location', 'working_dir', 'output_directory', - 'status']) + ]) @shared_task(bind=True, max_retries=10, retry_jitter=True, retry_backoff=60) @@ -116,19 +122,6 @@ def abort_job(self, job_id): logger.info("Abort job %s" % job_id) job = Job.objects.get(id=job_id) try: - logger.info("Submitting job %s to lsf" % job.id) - submitter = JobSubmitter(job_id, job.app, job.inputs, job.root_dir, job.resume_job_store_location) - external_job_id, job_store_dir, job_work_dir, job_output_dir = submitter.submit() - logger.info("Job %s submitted to lsf with id: %s" % (job_id, external_job_id)) - save_job_info(job_id, external_job_id, job_store_dir, job_work_dir, job_output_dir) - job.external_id = external_job_id - job.job_store_location = job_store_dir - job.working_dir = job_work_dir - job.output_directory = job_output_dir - job.status = Status.PENDING - log_path = os.path.join(job_work_dir, 'lsf.log') - update_message_by_key(job, 'log', log_path) - job.save() if job.status in (Status.PENDING, Status.RUNNING,): submitter = JobSubmitter(job_id, job.app, job.inputs, job.root_dir, job.resume_job_store_location) job_killed = submitter.abort(job.external_id) @@ -139,11 +132,15 @@ def abort_job(self, job_id): else: logger.info("Failed to abort job %s" % job_id) raise Exception("Failed to abort job %s" % job_id) - elif job.status in (Status.CREATED, Status.UNKNOWN,): - logger.info("Job aborting %s but still not submitted" % job_id) + elif job.status == Status.CREATED: + job.status = Status.ABORTED + job.save() + return + elif job.status == Status.UNKNOWN: + logger.info("Job aborting %s is unknown" % job_id) raise Exception("Job aborting %s but still not submitted" % job_id) else: - logger.info("Job %s already in final state %s") + logger.info("Job %s already in final state %s", job_id, job.status) return except Exception as e: logger.info("Error happened %s. Retrying..." % str(e)) @@ -214,6 +211,8 @@ def check_status_of_jobs(self): except Exception as e: error_message = "Failed to update job %s from file: %s\n%s" % (job.id, job_info_path,str(e)) logger.info(error_message) + elif not job.external_id and job.status == Status.PENDING: + continue elif job.external_id: submiter = JobSubmitter(str(job.id), job.app, job.inputs, job.root_dir, job.resume_job_store_location) lsf_status_info = submiter.status(job.external_id)