From 3f8142c6f83192ab2443eb0e9bd995bfa66781f3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 6 Jan 2022 22:42:30 +0000 Subject: [PATCH 01/64] Bump celery from 4.4.7 to 5.2.2 Bumps [celery](https://github.com/celery/celery) from 4.4.7 to 5.2.2. - [Release notes](https://github.com/celery/celery/releases) - [Changelog](https://github.com/celery/celery/blob/master/Changelog.rst) - [Commits](https://github.com/celery/celery/compare/v4.4.7...v5.2.2) --- updated-dependencies: - dependency-name: celery dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 2183da1c..2c8efb0b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ django-test-migrations==1.0.0 python-slugify==3.0.2 drf-yasg==1.17.1 GitPython==3.0.8 -celery==4.4.7 +celery==5.2.2 ruamel.yaml<=0.16.5 psycopg2-binary==2.8.6 mock==4.0.2 From b3eb544db67fc157076e0d7e683845b98b3ab635 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 6 Jan 2023 19:23:34 +0000 Subject: [PATCH 02/64] Bump gitpython from 3.0.8 to 3.1.30 Bumps [gitpython](https://github.com/gitpython-developers/GitPython) from 3.0.8 to 3.1.30. - [Release notes](https://github.com/gitpython-developers/GitPython/releases) - [Changelog](https://github.com/gitpython-developers/GitPython/blob/main/CHANGES) - [Commits](https://github.com/gitpython-developers/GitPython/compare/3.0.8...3.1.30) --- updated-dependencies: - dependency-name: gitpython dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 887a881f..45cdfc6f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ django-cors-headers==3.0.2 django-test-migrations==1.2.0 python-slugify==3.0.2 drf-yasg==1.17.1 -GitPython==3.0.8 +GitPython==3.1.30 celery==4.4.7 ruamel.yaml<=0.16.5 psycopg2-binary==2.8.6 From a40bd82b3c1acd5656458e8300caf34959473dfb Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 6 Jan 2023 19:23:53 +0000 Subject: [PATCH 03/64] Bump django-filter from 2.1.0 to 2.4.0 Bumps [django-filter](https://github.com/carltongibson/django-filter) from 2.1.0 to 2.4.0. - [Release notes](https://github.com/carltongibson/django-filter/releases) - [Changelog](https://github.com/carltongibson/django-filter/blob/main/CHANGES.rst) - [Commits](https://github.com/carltongibson/django-filter/compare/2.1.0...2.4.0) --- updated-dependencies: - dependency-name: django-filter dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 887a881f..1eb34783 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ Django==2.2.24 djangorestframework==3.11.2 markdown==3.1.1 -django-filter==2.1.0 +django-filter==2.4.0 django-cors-headers==3.0.2 django-test-migrations==1.2.0 python-slugify==3.0.2 From 05b29c4bfc9d8155fd3d4d88c1fd913d8e12f4e9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 6 Jan 2023 19:24:03 +0000 Subject: [PATCH 04/64] Bump django from 2.2.24 to 2.2.28 Bumps [django](https://github.com/django/django) from 2.2.24 to 2.2.28. - [Release notes](https://github.com/django/django/releases) - [Commits](https://github.com/django/django/compare/2.2.24...2.2.28) --- updated-dependencies: - dependency-name: django dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 887a881f..2299eaed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -Django==2.2.24 +Django==2.2.28 djangorestframework==3.11.2 markdown==3.1.1 django-filter==2.1.0 From 3b13e00d8ff9a258e218f556312bbd4d4e306f40 Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Tue, 7 Feb 2023 16:51:58 -0500 Subject: [PATCH 05/64] adding span instrumentation --- orchestrator/views/job_view.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/orchestrator/views/job_view.py b/orchestrator/views/job_view.py index ebb651c3..5d5b9525 100644 --- a/orchestrator/views/job_view.py +++ b/orchestrator/views/job_view.py @@ -14,6 +14,7 @@ from drf_yasg.utils import swagger_auto_schema from orchestrator.tasks import command_processor from orchestrator.commands.command import Command, CommandType +from ddtrace import tracer class JobViewSet( @@ -28,10 +29,13 @@ class JobViewSet( def get_serializer_class(self): return JobSerializer - + @tracer.wrap() def validate_and_save(self, data): serializer = JobSerializer(data=data) if serializer.is_valid(): + current_span = tracer.current_span() + request_id = data.get("inputs",{}).get("runparams",{}).get("project_prefix","None Specified") + current_span.set_tag("request.id", request_id) response = serializer.save() response = JobSerializer(response) return Response(response.data, status=status.HTTP_201_CREATED) From b054b195ea4b987ee33b959c4cac47ce1a3d70e7 Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Thu, 25 May 2023 16:22:33 -0400 Subject: [PATCH 06/64] Create Jenkinsfile --- Jenkinsfile | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 Jenkinsfile diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 00000000..608ff089 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,46 @@ +pipeline { + agent any + parameters { + choice(name: 'SERVER', choices: ['DEV', 'STAGE','PROD'], description: 'Server') + + } + stages { + stage("Deploy to Dev") { + when { + expression { params.SERVER == 'DEV' } + } + steps { + echo "deply to dev" + sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { + sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' + + } + + } + } + stage('Deploy to Stage') { + when { + expression { params.SERVER == 'STAGE' } + } + steps { + echo "deply to stage" + sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { + sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/staging_voyager/ridgeback && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' + + } + } + } + /* stage('Deploy to Prod') { + when { + expression { params.SERVER == 'PROD' } + } + steps { + echo "deply to PROD" + sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { + sh 'ssh -o StrictHostKeyChecking=no voyager@voyager.mskcc.org "cd /srv/services/ridgeback/code/ridgeback && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' + + } + } + } + } */ +} From ca3cf94b6cd9e5d28dc47da41c9d4d7678eb68d8 Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Thu, 25 May 2023 16:32:18 -0400 Subject: [PATCH 07/64] Update Jenkinsfile --- Jenkinsfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 608ff089..beb0f888 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -41,6 +41,6 @@ pipeline { } } - } - } */ + }*/ + } } From ca323a4bac065c43e1296b4e6bf1a0e67247fd73 Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Thu, 25 May 2023 16:48:05 -0400 Subject: [PATCH 08/64] Update Jenkinsfile --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index beb0f888..eb3ccd75 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -12,7 +12,7 @@ pipeline { steps { echo "deply to dev" sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' + sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && git pull && git checkout $BRANCH_NAME && source run_restart.sh"' } From f2d3034f467aeb6db3b467ff2150e376e66a7866 Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Thu, 25 May 2023 17:00:47 -0400 Subject: [PATCH 09/64] Update Jenkinsfile --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index eb3ccd75..5dad2d73 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -12,7 +12,7 @@ pipeline { steps { echo "deply to dev" sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && git pull && git checkout $BRANCH_NAME && source run_restart.sh"' + sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && git pull && git checkout $BRANCH_NAME && cd /srv/services/ridgeback_dev_2 && source run_restart.sh"' } From fcc8a6df5edf0c92b32c0f62508ef8b05e83328a Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Thu, 25 May 2023 17:17:05 -0400 Subject: [PATCH 10/64] Update Jenkinsfile --- Jenkinsfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 5dad2d73..e5831a86 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -5,7 +5,7 @@ pipeline { } stages { - stage("Deploy to Dev") { + /*stage("Deploy to Dev") { when { expression { params.SERVER == 'DEV' } } @@ -17,7 +17,7 @@ pipeline { } } - } + } */ stage('Deploy to Stage') { when { expression { params.SERVER == 'STAGE' } @@ -29,8 +29,8 @@ pipeline { } } - } - /* stage('Deploy to Prod') { + } + /* stage('Deploy to Prod') { when { expression { params.SERVER == 'PROD' } } From 35f75748d1ddca66443ef17b6c4734d5d30b6926 Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Thu, 25 May 2023 17:18:32 -0400 Subject: [PATCH 11/64] Update Jenkinsfile --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index e5831a86..f1356775 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -25,7 +25,7 @@ pipeline { steps { echo "deply to stage" sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/staging_voyager/ridgeback && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' + sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/staging_voyager/ridgeback && git pull && git checkout $BRANCH_NAME && source run_restart.sh"' } } From 3ecdbf6052ba68de832dab3b22cfb46555ac2992 Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Fri, 26 May 2023 12:45:04 -0400 Subject: [PATCH 12/64] Update Jenkinsfile --- Jenkinsfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index f1356775..02372289 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -5,7 +5,7 @@ pipeline { } stages { - /*stage("Deploy to Dev") { + stage("Deploy to Dev") { when { expression { params.SERVER == 'DEV' } } @@ -17,7 +17,7 @@ pipeline { } } - } */ + } stage('Deploy to Stage') { when { expression { params.SERVER == 'STAGE' } From 9487d15c1b88651c92e765318517139ea68d5a28 Mon Sep 17 00:00:00 2001 From: Ivkovic Date: Thu, 1 Jun 2023 13:56:33 -0400 Subject: [PATCH 13/64] Disable ddtrace for tests, and black formatting --- orchestrator/views/job_view.py | 3 ++- tests/test_api.py | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/orchestrator/views/job_view.py b/orchestrator/views/job_view.py index 5d5b9525..30019b28 100644 --- a/orchestrator/views/job_view.py +++ b/orchestrator/views/job_view.py @@ -29,12 +29,13 @@ class JobViewSet( def get_serializer_class(self): return JobSerializer + @tracer.wrap() def validate_and_save(self, data): serializer = JobSerializer(data=data) if serializer.is_valid(): current_span = tracer.current_span() - request_id = data.get("inputs",{}).get("runparams",{}).get("project_prefix","None Specified") + request_id = data.get("inputs", {}).get("runparams", {}).get("project_prefix", "None Specified") current_span.set_tag("request.id", request_id) response = serializer.save() response = JobSerializer(response) diff --git a/tests/test_api.py b/tests/test_api.py index 36a80259..8e0b7092 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,3 +1,4 @@ +import ddtrace from mock import patch from uuid import uuid4 from orchestrator.models import Job @@ -22,6 +23,7 @@ def setUp(self): root_dir="example_rootdir", id="7aacda86-b12f-4068-b2e3-a96552430a0f", job_store_location="/example_job_store", + inputs={}, ) self.api_root = reverse("api-root") @@ -45,6 +47,7 @@ def test_404_read(self): @patch("orchestrator.tasks.submit_job_to_lsf") def test_create(self, submit_jobs_mock): + ddtrace.tracer.enabled = False url = self.api_root + "jobs/" submit_jobs_mock.return_value = None data = { @@ -79,8 +82,9 @@ def test_delete_authorized(self): @patch("orchestrator.tasks.submit_job_to_lsf") def test_resume(self, submit_jobs_mock): + ddtrace.tracer.enabled = False url = "{}jobs/{}/resume/".format(self.api_root, self.example_job.id) - submit_jobs_mock.return_value = None + submit_jobs_mock.return_value = "submit_jobs_mock" data = {"type": 0, "root_dir": self.example_job.root_dir, "base_dir": "/base_dir"} response = self.client.post(url, data=data, format="json") self.assertEqual(response.status_code, status.HTTP_201_CREATED) From 31d3719b23b6b191d7aebf2a8213ce9ff6b26ff5 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Wed, 7 Jun 2023 18:10:40 -0400 Subject: [PATCH 14/64] Fixed jenkins file ssh and formatting --- Jenkinsfile | 61 ++++++++++++++++++++++++----------------------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 02372289..5729fd89 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,46 +1,41 @@ pipeline { agent any parameters { - choice(name: 'SERVER', choices: ['DEV', 'STAGE','PROD'], description: 'Server') - - } + choice(name: 'SERVER', choices: ['DEV', 'STAGE', 'PROD'], description: 'Server') + } stages { - stage("Deploy to Dev") { + stage('Deploy to Dev') { when { - expression { params.SERVER == 'DEV' } - } + expression { params.SERVER == 'DEV' } + } steps { - echo "deply to dev" - sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && git pull && git checkout $BRANCH_NAME && cd /srv/services/ridgeback_dev_2 && source run_restart.sh"' - - } - + echo 'deply to dev' + sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { + sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && git pull && git checkout $BRANCH_NAME && cd /srv/services/ridgeback_dev_2 && source run_restart.sh"' } - } + } + } stage('Deploy to Stage') { when { - expression { params.SERVER == 'STAGE' } - } - steps { - echo "deply to stage" - sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no voyager@silo.mskcc.org "cd /srv/services/staging_voyager/ridgeback && git pull && git checkout $BRANCH_NAME && source run_restart.sh"' - - } - } - } - /* stage('Deploy to Prod') { + expression { params.SERVER == 'STAGE' } + } + steps { + echo 'deply to stage' + sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { + sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@silo.mskcc.org "cd /srv/services/staging_voyager/ridgeback/code/ridgeback && git pull && git checkout $BRANCH_NAME && cd ../.. && source run_restart.sh"' + } + } + } + stage('Deploy to Prod') { when { expression { params.SERVER == 'PROD' } + } + steps { + echo 'deply to PROD' + sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { + sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@voyager.mskcc.org "cd /srv/services/ridgeback/code/ridgeback && git pull && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' } - steps { - echo "deply to PROD" - sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no voyager@voyager.mskcc.org "cd /srv/services/ridgeback/code/ridgeback && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' - - } - } - }*/ - } + } + } + } } From 96a2c30616483262f987e91f45e6831fc81c2f6a Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Wed, 7 Jun 2023 18:11:17 -0400 Subject: [PATCH 15/64] Added integration_tests for ridgeback --- integration_tests/Jenkinsfile | 27 +++++++++++++++++++++++++++ integration_tests/__init__.py | 0 2 files changed, 27 insertions(+) create mode 100644 integration_tests/Jenkinsfile create mode 100644 integration_tests/__init__.py diff --git a/integration_tests/Jenkinsfile b/integration_tests/Jenkinsfile new file mode 100644 index 00000000..d8fc8570 --- /dev/null +++ b/integration_tests/Jenkinsfile @@ -0,0 +1,27 @@ +pipeline { + agent any + stages { + stage('Deploy to Stage') { + steps { + build job: 'ridgeback-deploy/develop_mock', parameters: [[$class: 'StringParameterValue', name: 'SERVER', value: 'STAGE']], propagate: true, wait: true + } + } + stage('Run integration tests') { + steps { + build job: 'beagle_deployment_staging', propagate: true, wait: true + } + } + } + post { + failure { + slackSend channel: '#robot-house', + color: 'bad', + message: "The pipeline ${currentBuild.fullDisplayName} failed." + } + success { + slackSend channel: '#robot-house', + color: 'good', + message: "The pipeline ${currentBuild.fullDisplayName} completed successfully." + } + } +} diff --git a/integration_tests/__init__.py b/integration_tests/__init__.py new file mode 100644 index 00000000..e69de29b From 59d5949f416c384d325b9cae7a949675a4cb1d47 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Wed, 7 Jun 2023 18:36:10 -0400 Subject: [PATCH 16/64] Load singularity after ssh --- Jenkinsfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 5729fd89..3a797be1 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -11,7 +11,7 @@ pipeline { steps { echo 'deply to dev' sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && git pull && git checkout $BRANCH_NAME && cd /srv/services/ridgeback_dev_2 && source run_restart.sh"' + sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@silo.mskcc.org "cd /srv/services/ridgeback_dev_2/code/ridgeback && source /juno/work/ci/jenkins/lsf.sh && git pull && git checkout $BRANCH_NAME && cd /srv/services/ridgeback_dev_2 && source run_restart.sh"' } } } @@ -22,7 +22,7 @@ pipeline { steps { echo 'deply to stage' sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@silo.mskcc.org "cd /srv/services/staging_voyager/ridgeback/code/ridgeback && git pull && git checkout $BRANCH_NAME && cd ../.. && source run_restart.sh"' + sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@silo.mskcc.org "cd /srv/services/staging_voyager/ridgeback/code/ridgeback && source /juno/work/ci/jenkins/lsf.sh && git pull && git checkout $BRANCH_NAME && cd ../.. && source run_restart.sh"' } } } @@ -33,7 +33,7 @@ pipeline { steps { echo 'deply to PROD' sshagent(credentials: ['a4d999a5-6318-4659-83be-3f148a5490ca']) { - sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@voyager.mskcc.org "cd /srv/services/ridgeback/code/ridgeback && git pull && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' + sh 'ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/juno/work/ci/jenkins/known_hosts voyager@voyager.mskcc.org "cd /srv/services/ridgeback/code/ridgeback && source /juno/work/ci/jenkins/lsf.sh && git pull && git checkout $BRANCH_NAME && git pull && source run_restart.sh"' } } } From b39cc0e7b74bfe309f6d5e5a87242fea072f7f7e Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Fri, 30 Jun 2023 20:49:30 -0400 Subject: [PATCH 17/64] Update Jenkinsfile to use develop branch --- integration_tests/Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/Jenkinsfile b/integration_tests/Jenkinsfile index d8fc8570..98c30480 100644 --- a/integration_tests/Jenkinsfile +++ b/integration_tests/Jenkinsfile @@ -3,7 +3,7 @@ pipeline { stages { stage('Deploy to Stage') { steps { - build job: 'ridgeback-deploy/develop_mock', parameters: [[$class: 'StringParameterValue', name: 'SERVER', value: 'STAGE']], propagate: true, wait: true + build job: 'ridgeback-deploy/develop', parameters: [[$class: 'StringParameterValue', name: 'SERVER', value: 'STAGE']], propagate: true, wait: true } } stage('Run integration tests') { From 65e6a8ae9f6c0abc6769fbd31cc2780851c4d266 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 22 Aug 2023 12:04:39 -0400 Subject: [PATCH 18/64] Preserve work dir --- submitter/toil_submitter/toil_jobsubmitter.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index ee4070f3..bc943691 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -226,6 +226,8 @@ def _command_line(self): "--disable-user-provenance", "--disable-host-provenance", "--stats", + "--cleanWorkDir", + "onSuccess", "--debug", "--disableProgress", "--doubleMem", @@ -273,6 +275,8 @@ def _command_line(self): "--disable-user-provenance", "--disable-host-provenance", "--stats", + "--cleanWorkDir", + "onSuccess", "--debug", "--disableProgress", "--doubleMem", From d7e1d43792513d84a6a1867ceaa32a12f76a9916 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 12 Oct 2023 17:52:12 -0400 Subject: [PATCH 19/64] Fixed typo --- orchestrator/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 8fe0f38b..8573fc27 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -140,11 +140,11 @@ def command_processor(self, command_dict): self.retry() except RetryException as e: logger.info( - "Command %s failed. Retrying in %s. Excaption %s" % (command_dict, self.request.retries * 5, str(e)) + "Command %s failed. Retrying in %s. Exception %s" % (command_dict, self.request.retries * 5, str(e)) ) raise self.retry(exc=e, countdown=self.request.retries * 5, max_retries=5) except StopException as e: - logger.error("Command %s failed. Not retrying. Excaption %s" % (command_dict, str(e))) + logger.error("Command %s failed. Not retrying. Exception %s" % (command_dict, str(e))) def submit_job_to_lsf(job): From 09008970399c96de70cadb8f1fdb3967d53e78a8 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 12 Oct 2023 17:52:34 -0400 Subject: [PATCH 20/64] Do not run command tool status on completed leader jobs --- orchestrator/tasks.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 8573fc27..2ac2a81e 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -411,17 +411,18 @@ def update_command_line_jobs(command_line_jobs, root): def check_status_of_command_line_jobs(job): - submiter = JobSubmitterFactory.factory( - job.type, str(job.id), job.app, job.inputs, job.root_dir, job.resume_job_store_location, log_dir=job.log_dir - ) - track_cache_str = job.track_cache - command_line_status = submiter.get_commandline_status(track_cache_str) - command_line_jobs = {} - if command_line_status: - command_line_jobs_str, new_track_cache_str = command_line_status - new_track_cache = json.loads(new_track_cache_str) - command_line_jobs = json.loads(command_line_jobs_str) - job.track_cache = new_track_cache - job.save() - if command_line_jobs: - update_command_line_jobs(command_line_jobs, job) + if job.status != Status.COMPLETED: + submiter = JobSubmitterFactory.factory( + job.type, str(job.id), job.app, job.inputs, job.root_dir, job.resume_job_store_location, log_dir=job.log_dir + ) + track_cache_str = job.track_cache + command_line_status = submiter.get_commandline_status(track_cache_str) + command_line_jobs = {} + if command_line_status: + command_line_jobs_str, new_track_cache_str = command_line_status + new_track_cache = json.loads(new_track_cache_str) + command_line_jobs = json.loads(command_line_jobs_str) + job.track_cache = new_track_cache + job.save() + if command_line_jobs: + update_command_line_jobs(command_line_jobs, job) From ab6e4f278a6d0a5a25b454dad8cf359ae999748a Mon Sep 17 00:00:00 2001 From: buehlere Date: Wed, 18 Oct 2023 10:37:19 -0400 Subject: [PATCH 21/64] Update toil_jobsubmitter.py - this is the only repository that needs to be run like this --- submitter/toil_submitter/toil_jobsubmitter.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index a220b672..a45fdcf1 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -169,11 +169,9 @@ def _job_group(self): return ["-g", format_lsf_job_id(self.job_id)] def _command_line(self): - bypass_access_workflows = ["nucleo", "access_qc_generation"] - should_bypass_access_env = any([w in self.app.github.lower() for w in bypass_access_workflows]) single_machine_mode_workflows = ["nucleo_qc", "argos-qc"] single_machine = any([w in self.app.github.lower() for w in single_machine_mode_workflows]) - if "access" in self.app.github.lower() and not should_bypass_access_env: + if "git@github.com:mskcc/access-pipeline" in self.app.github.lower(): """ Start ACCESS-specific code """ From 88ab3297d85b44bfd995590242b5bd40e2e22845 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 9 Nov 2023 13:14:53 -0500 Subject: [PATCH 22/64] Version bump --- ridgeback/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ridgeback/__init__.py b/ridgeback/__init__.py index c347ac29..da2178f0 100644 --- a/ridgeback/__init__.py +++ b/ridgeback/__init__.py @@ -1 +1 @@ -__version__ = "1.29.0" +__version__ = "1.30.0" From 711ae1f51fe0a910ddf94b470ee0d3b6beb18de5 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 9 Nov 2023 14:55:04 -0500 Subject: [PATCH 23/64] Added setting for setting max hours for hanging job detection --- ridgeback/settings.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ridgeback/settings.py b/ridgeback/settings.py index 85e64be5..766d2dcf 100644 --- a/ridgeback/settings.py +++ b/ridgeback/settings.py @@ -274,3 +274,5 @@ # App Cache Configuration APP_CACHE = os.environ.get("RIDGEBACK_APP_CACHE", "/tmp") + +MAX_HANGING_HOURS = os.environ.get("RIDGEBACK_MAX_HANGING_HOURS", "5") \ No newline at end of file From d1cf4f92c49aeb639438b61eb7d133b47325b827 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 9 Nov 2023 14:56:01 -0500 Subject: [PATCH 24/64] Added task to check for hanging jobs --- orchestrator/commands/command.py | 1 + orchestrator/tasks.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/orchestrator/commands/command.py b/orchestrator/commands/command.py index 5185965b..27611cd6 100644 --- a/orchestrator/commands/command.py +++ b/orchestrator/commands/command.py @@ -10,6 +10,7 @@ class CommandType(IntEnum): SUSPEND = 4 RESUME = 5 SET_OUTPUT_PERMISSION = 6 + CHECK_HANGING = 7 class Command(object): diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 2ac2a81e..35034045 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -134,6 +134,9 @@ def command_processor(self, command_dict): elif command.command_type == CommandType.SET_OUTPUT_PERMISSION: logger.info("Setting output permission for job %s" % command.job_id) set_permission(job) + elif command.command_type == CommandType.CHECK_HANGING: + logger.info("Checking if the job %s has any hanging tasks" % command.job_id) + check_job_hanging(job) else: logger.info("Job lock not acquired for job: %s" % command.job_id) From ec45bbc8fff7fcfa8a78b2eff94180bb456f91d9 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 9 Nov 2023 14:56:16 -0500 Subject: [PATCH 25/64] Added check hanging function --- orchestrator/tasks.py | 84 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 35034045..7398fc10 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -6,7 +6,8 @@ from datetime import timedelta from celery import shared_task from django.conf import settings -from django.utils.timezone import now +from django.utils.timezone import now, make_aware +from django.utils import dateparse from .models import Job, Status, CommandLineToolJob from lib.memcache_lock import memcache_task_lock, memcache_lock from submitter.factory import JobSubmitterFactory @@ -89,9 +90,15 @@ def process_jobs(): Status.UNKNOWN, ) ).values_list("pk", flat=True) + + check_leader_not_running.delay() + + for job_id in status_jobs: # Send CHECK_STATUS commands for Jobs command_processor.delay(Command(CommandType.CHECK_STATUS_ON_LSF, str(job_id)).to_dict()) + if status_jobs.status == Status.RUNNING: + command_processor.delay(Command(CommandType.CHECK_HANGING, str(job_id)).to_dict()) jobs = Scheduler.get_jobs_to_submit() @@ -100,6 +107,7 @@ def process_jobs(): if Status(job.status).transition(Status.SUBMITTING): job.update_status(Status.SUBMITTING) command_processor.delay(Command(CommandType.SUBMIT, str(job.id)).to_dict()) + @shared_task(bind=True) @@ -251,6 +259,80 @@ def check_job_status(job): else: raise StopException("Invalid transition %s to %s" % (Status(job.status).name, Status(lsf_status).name)) +def _add_alert(job,alert_obj): + if "alerts" in job.message: + all_alerts = job.message['alerts'] + alert_ons = [ single_alert["on"] for single_alert in all_alerts] + current_alert_on = alert_obj["on"] + if current_alert_on not in alert_ons: + all_alerts.append(alert_obj) + job.message['alerts'] = all_alerts + job.save() + else: + job.message['alerts'] = [alert_obj] + job.save() + +@shared_task(bind=True) +def check_leader_not_running(self): + logger.info("Checking for any jobs stuck in a non-running state") + time_threshold = now() - timedelta(hours=int(settings.MAX_HANGING_HOURS)) + jobs = Job.objects.filter(modified_date__lt=time_threshold).exclude(status__in=[Status.COMPLETED,Status.TERMINATED,Status.RUNNING]) + for single_tool in jobs: + status_name = Status(single_tool.status).name + hang_time_seconds = (now() - single_tool.modified_date).total_seconds() + hang_time_hours = divmod(hang_time_seconds, 3600)[0] + message = "Leader job has been hanging on status {} for over {} hours.".format(status_name, hang_time_hours) + hang_time_obj = {"message": message, "hang_time":hang_time_hours, "since": str(single_tool.modified_date),"on":"leader_before_running"} + _add_alert(single_tool,hang_time_obj) + +def check_job_hanging(single_running_job): + time_threshold = now() - timedelta(hours=int(settings.MAX_HANGING_HOURS)) + non_running_tools = CommandLineToolJob.objects.filter(modified_date__lt=time_threshold).exclude(status__in=[Status.COMPLETED,Status.TERMINATED,Status.RUNNING]) + running_tools = CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, status=Status.RUNNING) + if len(running_tools) == 0: + completed_jobs_finished_time = list(CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, status=Status.COMPLETED).exclude(finished__isnull=True).values_list('finished', flat=True)) + if completed_jobs_finished_time: + latest_completed = max(completed_jobs_finished_time) + if latest_completed < time_threshold: + hang_time_seconds = (now() - latest_completed).total_seconds() + hang_time_hours = divmod(hang_time_seconds, 3600)[0] + message = "Leader job has not submitted a job for over {} hours.".format(hang_time_hours) + if "log" in single_running_job.message: + log_file = single_running_job.message["log"] + if log_file: + message += " Check this log for more info: {}".format(log_file) + hang_time_obj = {"message": message, "hang_time":hang_time_hours, "since": str(latest_completed),"on":"leader_running"} + _add_alert(single_running_job,hang_time_obj) + for single_tool in non_running_tools: + status_name = Status(single_tool.status).name + hang_time_seconds = (now() - single_tool.modified_date).total_seconds() + hang_time_hours = divmod(hang_time_seconds, 3600)[0] + job_name = single_tool.job_name + job_id = "{}_before_running".format(single_tool.job_id) + message = "{} job has been hanging on status {} for over {} hours.".format(job_name, status_name, hang_time_hours) + hang_time_obj = {"message": message, "hang_time":hang_time_hours, "since": str(single_tool.modified_date),"on":job_id} + _add_alert(single_running_job,hang_time_obj) + for single_running_tool in running_tools: + if "last_modified" not in single_running_tool.details: + continue + modified_date_str = single_running_tool.details['last_modified'] + if modified_date_str: + modified_date = make_aware(dateparse.parse_datetime(modified_date_str)) + if modified_date < time_threshold: + hang_time_seconds = (now() - modified_date).total_seconds() + hang_time_hours = divmod(hang_time_seconds, 3600)[0] + job_name = single_running_tool.job_name + job_id = "{}_running".format(single_running_tool.job_id) + message = "{} job has not logged an update for over {} hours.".format(job_name,hang_time_hours) + if "log_path" in single_running_tool.details: + log_file = single_running_tool.details["log_path"] + if log_file: + message += " Check this log for more info: {}".format(log_file) + hang_time_obj = {"message": message, "hang_time":hang_time_hours, "since": str(modified_date),"on":job_id} + _add_alert(single_running_job,hang_time_obj) + + + def terminate_job(job): if Status(job.status).transition(Status.TERMINATED): From 1c1db20c72fef90799d42eaf1b6c56da755d98ce Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 9 Nov 2023 14:56:40 -0500 Subject: [PATCH 26/64] Added alerts field to message --- orchestrator/models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestrator/models.py b/orchestrator/models.py index 091decd3..1d10297a 100644 --- a/orchestrator/models.py +++ b/orchestrator/models.py @@ -15,6 +15,7 @@ def message_default(): "log": "", "failed_jobs": {}, "unknown_jobs": {}, + "alerts": [], "info": "", } return message_default_dict From f88eedce0c301df9090aa2f46a1644aa0f8e3c1a Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 9 Nov 2023 14:57:10 -0500 Subject: [PATCH 27/64] Added log path to command line tool details --- submitter/toil_submitter/toil_jobsubmitter.py | 1 + submitter/toil_submitter/toil_track_utils.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index a45fdcf1..2a095251 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -94,6 +94,7 @@ def get_commandline_status(self, cache): "cpu_usage": single_job["cpu_usage"], "job_stream": single_job["job_stream"], "last_modified": single_job["last_modified"], + "log_path": single_job["log_path"], "mem_usage": single_job["mem_usage"], "memory_req": single_job["memory_req"], } diff --git a/submitter/toil_submitter/toil_track_utils.py b/submitter/toil_submitter/toil_track_utils.py index 6d7fd91f..a1220406 100755 --- a/submitter/toil_submitter/toil_track_utils.py +++ b/submitter/toil_submitter/toil_track_utils.py @@ -488,6 +488,7 @@ def mark_job_as_failed(self, job_id, job_name, job): "started": datetime.now(), "submitted": datetime.now(), "last_modified": datetime.now(), + "log_path": None, "finished": datetime.now(), } job_dict[job_id] = new_job @@ -593,6 +594,7 @@ def handle_current_jobs(self, toil_state_obj): "mem_usage": [], "started": None, "submitted": datetime.now(), + "log_path": None, "last_modified": None, "finished": None, } @@ -627,6 +629,7 @@ def handle_running_jobs(self): job_obj["started"] = datetime.now() if last_modified: job_obj["last_modified"] = last_modified + job_obj["log_path"] = worker_log if worker_log not in worker_log_to_job_dict: worker_log_to_job_dict[worker_log] = single_job_id From 2e5b66a7e4f25bdba0957128f8f8853d7ff29d9f Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 9 Nov 2023 14:57:57 -0500 Subject: [PATCH 28/64] Added tests --- tests/test_commandline.py | 146 +++++++++++++++++++++++++++++++++++++- 1 file changed, 144 insertions(+), 2 deletions(-) diff --git a/tests/test_commandline.py b/tests/test_commandline.py index f1afa8c7..c18eaa6f 100644 --- a/tests/test_commandline.py +++ b/tests/test_commandline.py @@ -7,8 +7,7 @@ from django.test import TestCase, override_settings import toil from orchestrator.models import Job, Status, PipelineType, CommandLineToolJob -from orchestrator.tasks import check_status_of_command_line_jobs - +from orchestrator.tasks import check_status_of_command_line_jobs, check_job_hanging, check_leader_not_running class TestToil(TestCase): """ @@ -104,3 +103,146 @@ def test_failed(self): mock_num_failed = 2 num_failed = CommandLineToolJob.objects.filter(status=(Status.FAILED)).count() self.assertEqual(num_failed, mock_num_failed) + + def test_details_set(self): + """ + Test if the metadata is being set for commandLineJObs + """ + self.mock_track("running") + first_running_job = CommandLineToolJob.objects.filter(status=(Status.RUNNING)).first() + details = first_running_job.details + self.assertIsNotNone(details) + self.assertIsNotNone(details["cores_req"]) + self.assertIsNotNone(details["cpu_usage"]) + self.assertIsNotNone(details["job_stream"]) + self.assertIsNotNone(details["last_modified"]) + self.assertIsNotNone(details["log_path"]) + self.assertIsNotNone(details["mem_usage"]) + self.assertIsNotNone(details["memory_req"]) + + def test_hanging_toil_leader_not_running(self): + """ + Test detection of a hanging toil leader job before its running + """ + self.mock_track("running") + self.job.status = Status.PENDING + self.job.save() + with override_settings(MAX_HANGING_HOURS=0): + check_leader_not_running() + self.job.refresh_from_db() + self.assertIsNotNone(self.job.message) + self.assertIsNotNone(self.job.message['alerts'][0]) + + def test_hanging_no_duplicated_alerts(self): + """ + Test to make sure the same alert does not get triggered twice + """ + self.mock_track("running") + self.job.status = Status.PENDING + self.job.save() + with override_settings(MAX_HANGING_HOURS=0): + check_leader_not_running() + self.job.refresh_from_db() + with override_settings(MAX_HANGING_HOURS=0): + check_leader_not_running() + self.job.refresh_from_db() + self.assertIsNotNone(self.job.message) + self.assertTrue(len(self.job.message['alerts']) == 1) + + def test_hanging_toil_leader_running(self): + """ + Test detection of a hanging toil leader job while its running + """ + self.mock_track("running") + for single_job in CommandLineToolJob.objects.all(): + single_job.status = Status.COMPLETED + single_job.save() + with override_settings(MAX_HANGING_HOURS=0): + check_job_hanging(self.job) + self.job.refresh_from_db() + self.assertIsNotNone(self.job.message) + self.assertIsNotNone(self.job.message['alerts'][0]) + + def test_hanging_toil_commandline_not_running(self): + """ + Test detection of a hanging command that has not started yet + """ + self.mock_track("running") + all_tool_jobs = CommandLineToolJob.objects.all() + count = all_tool_jobs.count() + for single_job in CommandLineToolJob.objects.all(): + single_job.status = Status.PENDING + single_job.save() + with override_settings(MAX_HANGING_HOURS=0): + check_job_hanging(self.job) + self.job.refresh_from_db() + self.assertIsNotNone(self.job.message) + self.assertIsNotNone(self.job.message['alerts'][0]) + self.assertTrue(len(self.job.message['alerts']) == count) + + def test_hanging_toil_commandline_running(self): + """ + Test detection of a hanging command that have started running + """ + self.mock_track("running") + all_tool_jobs = CommandLineToolJob.objects.all() + count = all_tool_jobs.count() + for single_job in CommandLineToolJob.objects.all(): + single_job.status = Status.RUNNING + single_job.save() + with override_settings(MAX_HANGING_HOURS=0): + check_job_hanging(self.job) + self.job.refresh_from_db() + self.assertIsNotNone(self.job.message) + self.assertIsNotNone(self.job.message['alerts'][0]) + self.assertTrue(len(self.job.message['alerts']) == count) + + def test_hanging_toil_commandline_mix(self): + """ + Test detection of a hanging command that has not started yet and some that are running + """ + self.mock_track("running") + first_command = CommandLineToolJob.objects.first() + first_command.status = Status.RUNNING + first_command.save() + last_command = CommandLineToolJob.objects.last() + last_command.status = Status.PENDING + last_command.save() + with override_settings(MAX_HANGING_HOURS=0): + check_job_hanging(self.job) + self.job.refresh_from_db() + self.assertIsNotNone(self.job.message) + self.assertIsNotNone(self.job.message['alerts'][0]) + + def test_hanging_message_for_toil_leader_running(self): + """ + Test alert sent of a hanging toil leader job while its running + """ + self.mock_track("running") + MOCK_LOG_PATH = "path/to/log.log" + for single_job in CommandLineToolJob.objects.all(): + single_job.status = Status.COMPLETED + single_job.save() + self.job.message['log'] = MOCK_LOG_PATH + self.job.save() + with override_settings(MAX_HANGING_HOURS=0): + check_job_hanging(self.job) + self.job.refresh_from_db() + self.assertEqual(self.job.message['log'],MOCK_LOG_PATH) + self.assertIsNotNone(self.job.message['alerts'][0]) + self.assertTrue(MOCK_LOG_PATH in self.job.message['alerts'][0]['message']) + + def test_hanging_message_for_toil_leader_running(self): + """ + Test alert sent of a hanging tool while its running + """ + self.mock_track("running") + first_command = CommandLineToolJob.objects.first() + first_command.status = Status.RUNNING + first_command.save() + command_log_path = first_command.details['log_path'] + with override_settings(MAX_HANGING_HOURS=0): + check_job_hanging(self.job) + self.job.refresh_from_db() + self.assertIsNotNone(self.job.message['alerts'][0]) + self.assertTrue(command_log_path in self.job.message['alerts'][0]['message']) From e14e36c196b0d265577493638677bd49383f95bf Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 9 Nov 2023 14:59:14 -0500 Subject: [PATCH 29/64] Added black formatting --- orchestrator/tasks.py | 75 +++++++++++++++++++++++++++------------ tests/test_commandline.py | 37 +++++++++---------- 2 files changed, 71 insertions(+), 41 deletions(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 7398fc10..cb93a496 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -93,7 +93,6 @@ def process_jobs(): check_leader_not_running.delay() - for job_id in status_jobs: # Send CHECK_STATUS commands for Jobs command_processor.delay(Command(CommandType.CHECK_STATUS_ON_LSF, str(job_id)).to_dict()) @@ -107,7 +106,6 @@ def process_jobs(): if Status(job.status).transition(Status.SUBMITTING): job.update_status(Status.SUBMITTING) command_processor.delay(Command(CommandType.SUBMIT, str(job.id)).to_dict()) - @shared_task(bind=True) @@ -259,38 +257,54 @@ def check_job_status(job): else: raise StopException("Invalid transition %s to %s" % (Status(job.status).name, Status(lsf_status).name)) -def _add_alert(job,alert_obj): + +def _add_alert(job, alert_obj): if "alerts" in job.message: - all_alerts = job.message['alerts'] - alert_ons = [ single_alert["on"] for single_alert in all_alerts] + all_alerts = job.message["alerts"] + alert_ons = [single_alert["on"] for single_alert in all_alerts] current_alert_on = alert_obj["on"] if current_alert_on not in alert_ons: all_alerts.append(alert_obj) - job.message['alerts'] = all_alerts + job.message["alerts"] = all_alerts job.save() else: - job.message['alerts'] = [alert_obj] + job.message["alerts"] = [alert_obj] job.save() + @shared_task(bind=True) def check_leader_not_running(self): logger.info("Checking for any jobs stuck in a non-running state") time_threshold = now() - timedelta(hours=int(settings.MAX_HANGING_HOURS)) - jobs = Job.objects.filter(modified_date__lt=time_threshold).exclude(status__in=[Status.COMPLETED,Status.TERMINATED,Status.RUNNING]) + jobs = Job.objects.filter(modified_date__lt=time_threshold).exclude( + status__in=[Status.COMPLETED, Status.TERMINATED, Status.RUNNING] + ) for single_tool in jobs: status_name = Status(single_tool.status).name hang_time_seconds = (now() - single_tool.modified_date).total_seconds() hang_time_hours = divmod(hang_time_seconds, 3600)[0] message = "Leader job has been hanging on status {} for over {} hours.".format(status_name, hang_time_hours) - hang_time_obj = {"message": message, "hang_time":hang_time_hours, "since": str(single_tool.modified_date),"on":"leader_before_running"} - _add_alert(single_tool,hang_time_obj) + hang_time_obj = { + "message": message, + "hang_time": hang_time_hours, + "since": str(single_tool.modified_date), + "on": "leader_before_running", + } + _add_alert(single_tool, hang_time_obj) + def check_job_hanging(single_running_job): time_threshold = now() - timedelta(hours=int(settings.MAX_HANGING_HOURS)) - non_running_tools = CommandLineToolJob.objects.filter(modified_date__lt=time_threshold).exclude(status__in=[Status.COMPLETED,Status.TERMINATED,Status.RUNNING]) + non_running_tools = CommandLineToolJob.objects.filter(modified_date__lt=time_threshold).exclude( + status__in=[Status.COMPLETED, Status.TERMINATED, Status.RUNNING] + ) running_tools = CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, status=Status.RUNNING) if len(running_tools) == 0: - completed_jobs_finished_time = list(CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, status=Status.COMPLETED).exclude(finished__isnull=True).values_list('finished', flat=True)) + completed_jobs_finished_time = list( + CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, status=Status.COMPLETED) + .exclude(finished__isnull=True) + .values_list("finished", flat=True) + ) if completed_jobs_finished_time: latest_completed = max(completed_jobs_finished_time) if latest_completed < time_threshold: @@ -301,21 +315,33 @@ def check_job_hanging(single_running_job): log_file = single_running_job.message["log"] if log_file: message += " Check this log for more info: {}".format(log_file) - hang_time_obj = {"message": message, "hang_time":hang_time_hours, "since": str(latest_completed),"on":"leader_running"} - _add_alert(single_running_job,hang_time_obj) + hang_time_obj = { + "message": message, + "hang_time": hang_time_hours, + "since": str(latest_completed), + "on": "leader_running", + } + _add_alert(single_running_job, hang_time_obj) for single_tool in non_running_tools: status_name = Status(single_tool.status).name hang_time_seconds = (now() - single_tool.modified_date).total_seconds() hang_time_hours = divmod(hang_time_seconds, 3600)[0] job_name = single_tool.job_name job_id = "{}_before_running".format(single_tool.job_id) - message = "{} job has been hanging on status {} for over {} hours.".format(job_name, status_name, hang_time_hours) - hang_time_obj = {"message": message, "hang_time":hang_time_hours, "since": str(single_tool.modified_date),"on":job_id} - _add_alert(single_running_job,hang_time_obj) + message = "{} job has been hanging on status {} for over {} hours.".format( + job_name, status_name, hang_time_hours + ) + hang_time_obj = { + "message": message, + "hang_time": hang_time_hours, + "since": str(single_tool.modified_date), + "on": job_id, + } + _add_alert(single_running_job, hang_time_obj) for single_running_tool in running_tools: if "last_modified" not in single_running_tool.details: continue - modified_date_str = single_running_tool.details['last_modified'] + modified_date_str = single_running_tool.details["last_modified"] if modified_date_str: modified_date = make_aware(dateparse.parse_datetime(modified_date_str)) if modified_date < time_threshold: @@ -323,15 +349,18 @@ def check_job_hanging(single_running_job): hang_time_hours = divmod(hang_time_seconds, 3600)[0] job_name = single_running_tool.job_name job_id = "{}_running".format(single_running_tool.job_id) - message = "{} job has not logged an update for over {} hours.".format(job_name,hang_time_hours) + message = "{} job has not logged an update for over {} hours.".format(job_name, hang_time_hours) if "log_path" in single_running_tool.details: log_file = single_running_tool.details["log_path"] if log_file: message += " Check this log for more info: {}".format(log_file) - hang_time_obj = {"message": message, "hang_time":hang_time_hours, "since": str(modified_date),"on":job_id} - _add_alert(single_running_job,hang_time_obj) - - + hang_time_obj = { + "message": message, + "hang_time": hang_time_hours, + "since": str(modified_date), + "on": job_id, + } + _add_alert(single_running_job, hang_time_obj) def terminate_job(job): diff --git a/tests/test_commandline.py b/tests/test_commandline.py index c18eaa6f..b3a9f2c2 100644 --- a/tests/test_commandline.py +++ b/tests/test_commandline.py @@ -9,6 +9,7 @@ from orchestrator.models import Job, Status, PipelineType, CommandLineToolJob from orchestrator.tasks import check_status_of_command_line_jobs, check_job_hanging, check_leader_not_running + class TestToil(TestCase): """ Test toil track functions @@ -103,7 +104,7 @@ def test_failed(self): mock_num_failed = 2 num_failed = CommandLineToolJob.objects.filter(status=(Status.FAILED)).count() self.assertEqual(num_failed, mock_num_failed) - + def test_details_set(self): """ Test if the metadata is being set for commandLineJObs @@ -119,7 +120,7 @@ def test_details_set(self): self.assertIsNotNone(details["log_path"]) self.assertIsNotNone(details["mem_usage"]) self.assertIsNotNone(details["memory_req"]) - + def test_hanging_toil_leader_not_running(self): """ Test detection of a hanging toil leader job before its running @@ -131,7 +132,7 @@ def test_hanging_toil_leader_not_running(self): check_leader_not_running() self.job.refresh_from_db() self.assertIsNotNone(self.job.message) - self.assertIsNotNone(self.job.message['alerts'][0]) + self.assertIsNotNone(self.job.message["alerts"][0]) def test_hanging_no_duplicated_alerts(self): """ @@ -147,7 +148,7 @@ def test_hanging_no_duplicated_alerts(self): check_leader_not_running() self.job.refresh_from_db() self.assertIsNotNone(self.job.message) - self.assertTrue(len(self.job.message['alerts']) == 1) + self.assertTrue(len(self.job.message["alerts"]) == 1) def test_hanging_toil_leader_running(self): """ @@ -161,7 +162,7 @@ def test_hanging_toil_leader_running(self): check_job_hanging(self.job) self.job.refresh_from_db() self.assertIsNotNone(self.job.message) - self.assertIsNotNone(self.job.message['alerts'][0]) + self.assertIsNotNone(self.job.message["alerts"][0]) def test_hanging_toil_commandline_not_running(self): """ @@ -177,8 +178,8 @@ def test_hanging_toil_commandline_not_running(self): check_job_hanging(self.job) self.job.refresh_from_db() self.assertIsNotNone(self.job.message) - self.assertIsNotNone(self.job.message['alerts'][0]) - self.assertTrue(len(self.job.message['alerts']) == count) + self.assertIsNotNone(self.job.message["alerts"][0]) + self.assertTrue(len(self.job.message["alerts"]) == count) def test_hanging_toil_commandline_running(self): """ @@ -194,8 +195,8 @@ def test_hanging_toil_commandline_running(self): check_job_hanging(self.job) self.job.refresh_from_db() self.assertIsNotNone(self.job.message) - self.assertIsNotNone(self.job.message['alerts'][0]) - self.assertTrue(len(self.job.message['alerts']) == count) + self.assertIsNotNone(self.job.message["alerts"][0]) + self.assertTrue(len(self.job.message["alerts"]) == count) def test_hanging_toil_commandline_mix(self): """ @@ -212,8 +213,8 @@ def test_hanging_toil_commandline_mix(self): check_job_hanging(self.job) self.job.refresh_from_db() self.assertIsNotNone(self.job.message) - self.assertIsNotNone(self.job.message['alerts'][0]) - + self.assertIsNotNone(self.job.message["alerts"][0]) + def test_hanging_message_for_toil_leader_running(self): """ Test alert sent of a hanging toil leader job while its running @@ -223,14 +224,14 @@ def test_hanging_message_for_toil_leader_running(self): for single_job in CommandLineToolJob.objects.all(): single_job.status = Status.COMPLETED single_job.save() - self.job.message['log'] = MOCK_LOG_PATH + self.job.message["log"] = MOCK_LOG_PATH self.job.save() with override_settings(MAX_HANGING_HOURS=0): check_job_hanging(self.job) self.job.refresh_from_db() - self.assertEqual(self.job.message['log'],MOCK_LOG_PATH) - self.assertIsNotNone(self.job.message['alerts'][0]) - self.assertTrue(MOCK_LOG_PATH in self.job.message['alerts'][0]['message']) + self.assertEqual(self.job.message["log"], MOCK_LOG_PATH) + self.assertIsNotNone(self.job.message["alerts"][0]) + self.assertTrue(MOCK_LOG_PATH in self.job.message["alerts"][0]["message"]) def test_hanging_message_for_toil_leader_running(self): """ @@ -240,9 +241,9 @@ def test_hanging_message_for_toil_leader_running(self): first_command = CommandLineToolJob.objects.first() first_command.status = Status.RUNNING first_command.save() - command_log_path = first_command.details['log_path'] + command_log_path = first_command.details["log_path"] with override_settings(MAX_HANGING_HOURS=0): check_job_hanging(self.job) self.job.refresh_from_db() - self.assertIsNotNone(self.job.message['alerts'][0]) - self.assertTrue(command_log_path in self.job.message['alerts'][0]['message']) + self.assertIsNotNone(self.job.message["alerts"][0]) + self.assertTrue(command_log_path in self.job.message["alerts"][0]["message"]) From a6da999803d5b7dc2419ba58798d4fa047cd074d Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 13:14:58 -0500 Subject: [PATCH 30/64] Added tool and leader walltime fields for jobsubmitter --- submitter/factory.py | 7 +++--- submitter/jobsubmitter.py | 5 ++-- .../nextflow_jobsubmitter.py | 10 ++++---- submitter/toil_submitter/toil_jobsubmitter.py | 23 +++++++++++++------ 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/submitter/factory.py b/submitter/factory.py index 39257223..33ad297d 100644 --- a/submitter/factory.py +++ b/submitter/factory.py @@ -12,11 +12,12 @@ def factory( inputs, root_dir, resume_jobstore=None, - walltime=settings.LSF_WALLTIME, + leader_walltime=None, + tool_walltime=None, memlimit=None, log_dir=None, ): if type == PipelineType.CWL: - return ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit, log_dir) + return ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir) elif type == PipelineType.NEXTFLOW: - return NextflowJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit, log_dir) + return NextflowJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir) diff --git a/submitter/jobsubmitter.py b/submitter/jobsubmitter.py index 6ee8b32a..3ef8208c 100644 --- a/submitter/jobsubmitter.py +++ b/submitter/jobsubmitter.py @@ -3,12 +3,13 @@ class JobSubmitter(object): - def __init__(self, job_id, app, inputs, walltime, memlimit, log_dir=None): + def __init__(self, job_id, app, inputs, leader_walltime, tool_walltime, memlimit, log_dir=None): self.app = App.factory(app) self.job_id = job_id self.inputs = inputs self.lsf_client = LSFClient() - self.walltime = walltime + self.leader_walltime = leader_walltime + self.tool_walltime = tool_walltime self.memlimit = memlimit self.log_dir = log_dir diff --git a/submitter/nextflow_submitter/nextflow_jobsubmitter.py b/submitter/nextflow_submitter/nextflow_jobsubmitter.py index 90d4fb8f..d9617dbe 100644 --- a/submitter/nextflow_submitter/nextflow_jobsubmitter.py +++ b/submitter/nextflow_submitter/nextflow_jobsubmitter.py @@ -6,7 +6,7 @@ class NextflowJobSubmitter(JobSubmitter): - def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit, log_dir=None): + def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir=None): """ :param job_id: :param app: github.url @@ -32,7 +32,7 @@ def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, mem :param root_dir: :param resume_jobstore: """ - JobSubmitter.__init__(self, job_id, app, inputs, walltime, memlimit, log_dir) + JobSubmitter.__init__(self, job_id, app, inputs, leader_walltime, tool_walltime, memlimit, log_dir) self.resume_jobstore = resume_jobstore if resume_jobstore: self.job_store_dir = resume_jobstore @@ -51,16 +51,16 @@ def submit(self): env["JAVA_HOME"] = "/opt/common/CentOS_7/java/jdk1.8.0_202/" env["PATH"] = env["JAVA_HOME"] + "bin:" + os.environ["PATH"] env["TMPDIR"] = self.job_tmp_dir - external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, self.job_id, env) + external_id = self.lsf_client.submit(command_line, self._leader_args(), log_path, self.job_id, env) return external_id, self.job_store_dir, self.job_work_dir, self.job_outputs_dir - def _job_args(self): + def _leader_args(self): args = self._walltime() args.extend(self._memlimit()) return args def _walltime(self): - return ["-W", str(self.walltime)] if self.walltime else [] + return ["-W", str(self.leader_walltime)] if self.leader_walltime else [] def _memlimit(self): return ["-M", self.memlimit] if self.memlimit else ["-M", "20"] diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index a45fdcf1..af543e47 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -25,8 +25,8 @@ def translate_toil_to_model_status(status): class ToilJobSubmitter(JobSubmitter): - def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit, log_dir=None): - JobSubmitter.__init__(self, job_id, app, inputs, walltime, memlimit, log_dir) + def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir=None): + JobSubmitter.__init__(self, job_id, app, inputs, leader_walltime, tool_walltime, memlimit, log_dir) self.resume_jobstore = resume_jobstore if resume_jobstore: self.job_store_dir = resume_jobstore @@ -45,13 +45,13 @@ def submit(self): toil_lsf_args = "-sla %s %s %s" % ( settings.LSF_SLA, " ".join(self._job_group()), - " ".join(self._job_args()), + " ".join(self._tool_args()), ) else: - toil_lsf_args = "%s %s" % (" ".join(self._job_group()), " ".join(self._job_args())) + toil_lsf_args = "%s %s" % (" ".join(self._job_group()), " ".join(self._tool_args())) env["JAVA_HOME"] = None env["TOIL_LSF_ARGS"] = toil_lsf_args - external_id = self.lsf_client.submit(command_line, self._job_args(), log_path, self.job_id, env) + external_id = self.lsf_client.submit(command_line, self._leader_args(), log_path, self.job_id, env) return external_id, self.job_store_dir, self.job_work_dir, self.job_outputs_dir def get_commandline_status(self, cache): @@ -154,13 +154,22 @@ def _prepare_directories(self): if not os.path.exists(self.job_tmp_dir): os.mkdir(self.job_tmp_dir) - def _job_args(self): + def _leader_args(self): args = self._walltime() args.extend(self._memlimit()) return args + def _tool_args(self): + args = [] + if self.tool_walltime: + expected_limit = max(1, int(self.tool_walltime/4)) + hard_limit = self.tool_walltime + args = ["-We",str(expected_limit),"-W",str(hard_limit)] + args.extend(self._memlimit()) + return args + def _walltime(self): - return ["-W", str(self.walltime)] if self.walltime else [] + return ["-W", str(self.leader_walltime)] if self.leader_walltime else [] def _memlimit(self): return ["-M", self.memlimit] if self.memlimit else [] From c0acefc880770671ead2c77d769966f6dcf0c819 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 13:17:35 -0500 Subject: [PATCH 31/64] Added tool and leader walltime fields for job --- orchestrator/models.py | 3 ++- orchestrator/tasks.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/orchestrator/models.py b/orchestrator/models.py index 091decd3..6a15b4c4 100644 --- a/orchestrator/models.py +++ b/orchestrator/models.py @@ -173,7 +173,8 @@ class Job(BaseModel): submitted = models.DateTimeField(blank=True, null=True) finished = models.DateTimeField(blank=True, null=True) track_cache = JSONField(blank=True, null=True) - walltime = models.IntegerField(default=4320) + leader_walltime = models.IntegerField(default=int(settings.LSF_WALLTIME)) + tool_walltime = models.IntegerField(default=24) memlimit = models.CharField(blank=True, null=True, default=None, max_length=20) metadata = JSONField(blank=True, null=True, default=dict) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 2ac2a81e..bdf8e053 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -157,7 +157,8 @@ def submit_job_to_lsf(job): job.inputs, job.root_dir, job.resume_job_store_location, - job.walltime, + job.leader_walltime, + job.tool_walltime, job.memlimit, log_dir=job.log_dir, ) From bcdd2df666098fb191c12157fc4b0610ed87ef1d Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 13:42:03 -0500 Subject: [PATCH 32/64] Removed unused setting to prevent confusion --- orchestrator/models.py | 2 +- ridgeback/settings.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/orchestrator/models.py b/orchestrator/models.py index 6a15b4c4..4fa5351b 100644 --- a/orchestrator/models.py +++ b/orchestrator/models.py @@ -173,7 +173,7 @@ class Job(BaseModel): submitted = models.DateTimeField(blank=True, null=True) finished = models.DateTimeField(blank=True, null=True) track_cache = JSONField(blank=True, null=True) - leader_walltime = models.IntegerField(default=int(settings.LSF_WALLTIME)) + leader_walltime = models.IntegerField(default=168) tool_walltime = models.IntegerField(default=24) memlimit = models.CharField(blank=True, null=True, default=None, max_length=20) metadata = JSONField(blank=True, null=True, default=dict) diff --git a/ridgeback/settings.py b/ridgeback/settings.py index 85e64be5..591a9434 100644 --- a/ridgeback/settings.py +++ b/ridgeback/settings.py @@ -246,7 +246,6 @@ TOIL_JOB_STORE_ROOT = os.environ["RIDGEBACK_TOIL_JOB_STORE_ROOT"] TOIL_WORK_DIR_ROOT = os.environ["RIDGEBACK_TOIL_WORK_DIR_ROOT"] TOIL_TMP_DIR_ROOT = os.environ["RIDGEBACK_TOIL_TMP_DIR_ROOT"] -LSF_WALLTIME = os.environ["RIDGEBACK_LSF_WALLTIME"] LSF_SLA = os.environ.get("RIDGEBACK_LSF_SLA", None) CWLTOIL = os.environ.get("RIDGEBACK_TOIL", "toil-cwl-runner") TOIL_STATE_POLLING_WAIT = os.environ.get("TOIL_STATE_POLLING_WAIT", 60) From 38e4f42bdedd09d6d0b2705f67466f52d7b665ff Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 13:42:34 -0500 Subject: [PATCH 33/64] Added walltime field migration --- .../migrations/0013_auto_20231114_1341.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 orchestrator/migrations/0013_auto_20231114_1341.py diff --git a/orchestrator/migrations/0013_auto_20231114_1341.py b/orchestrator/migrations/0013_auto_20231114_1341.py new file mode 100644 index 00000000..4e41f7af --- /dev/null +++ b/orchestrator/migrations/0013_auto_20231114_1341.py @@ -0,0 +1,27 @@ +# Generated by Django 2.2.24 on 2023-11-14 18:41 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('orchestrator', '0012_job_base_dir'), + ] + + operations = [ + migrations.RemoveField( + model_name='job', + name='walltime', + ), + migrations.AddField( + model_name='job', + name='leader_walltime', + field=models.IntegerField(default=168), + ), + migrations.AddField( + model_name='job', + name='tool_walltime', + field=models.IntegerField(default=24), + ), + ] From 9c10ff0af073e881e7a062c127ab7dca5c372014 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 13:47:49 -0500 Subject: [PATCH 34/64] Added black formatting --- orchestrator/admin.py | 1 - submitter/factory.py | 8 ++++++-- submitter/nextflow_submitter/nextflow_jobsubmitter.py | 4 +++- submitter/toil_submitter/toil_jobsubmitter.py | 10 ++++++---- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/orchestrator/admin.py b/orchestrator/admin.py index 037aee55..1d9d777f 100644 --- a/orchestrator/admin.py +++ b/orchestrator/admin.py @@ -48,7 +48,6 @@ def cleanup_files(self, request, queryset): Already cleaned up {cleaned_up} """ for job in queryset: - if all([job.job_store_clean_up, job.working_dir_clean_up]): already_cleaned_up_projects = already_cleaned_up_projects + 1 elif any([job.job_store_clean_up, job.working_dir_clean_up]): diff --git a/submitter/factory.py b/submitter/factory.py index 33ad297d..125e9cf5 100644 --- a/submitter/factory.py +++ b/submitter/factory.py @@ -18,6 +18,10 @@ def factory( log_dir=None, ): if type == PipelineType.CWL: - return ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir) + return ToilJobSubmitter( + job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir + ) elif type == PipelineType.NEXTFLOW: - return NextflowJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir) + return NextflowJobSubmitter( + job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir + ) diff --git a/submitter/nextflow_submitter/nextflow_jobsubmitter.py b/submitter/nextflow_submitter/nextflow_jobsubmitter.py index d9617dbe..05842367 100644 --- a/submitter/nextflow_submitter/nextflow_jobsubmitter.py +++ b/submitter/nextflow_submitter/nextflow_jobsubmitter.py @@ -6,7 +6,9 @@ class NextflowJobSubmitter(JobSubmitter): - def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir=None): + def __init__( + self, job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir=None + ): """ :param job_id: :param app: github.url diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index af543e47..a6a0953b 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -25,7 +25,9 @@ def translate_toil_to_model_status(status): class ToilJobSubmitter(JobSubmitter): - def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir=None): + def __init__( + self, job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir=None + ): JobSubmitter.__init__(self, job_id, app, inputs, leader_walltime, tool_walltime, memlimit, log_dir) self.resume_jobstore = resume_jobstore if resume_jobstore: @@ -162,9 +164,9 @@ def _leader_args(self): def _tool_args(self): args = [] if self.tool_walltime: - expected_limit = max(1, int(self.tool_walltime/4)) - hard_limit = self.tool_walltime - args = ["-We",str(expected_limit),"-W",str(hard_limit)] + expected_limit = max(1, int(self.tool_walltime / 4)) + hard_limit = self.tool_walltime + args = ["-We", str(expected_limit), "-W", str(hard_limit)] args.extend(self._memlimit()) return args From c2b9f0fa2a3179dfe31438f677575475f52d6cc2 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 14:31:36 -0500 Subject: [PATCH 35/64] Fixed walltime query --- orchestrator/scheduler/scheduler.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/orchestrator/scheduler/scheduler.py b/orchestrator/scheduler/scheduler.py index f38d0eaa..d68a40dd 100644 --- a/orchestrator/scheduler/scheduler.py +++ b/orchestrator/scheduler/scheduler.py @@ -19,27 +19,27 @@ def get_jobs_to_submit(): LONG: 7200 <= walltime """ short_jobs_count = Job.objects.filter( - status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, walltime__lt=settings.SHORT_JOB_MAX_DURATION + status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, leader_walltime__lt=settings.SHORT_JOB_MAX_DURATION ).count() medium_jobs_count = Job.objects.filter( status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, - walltime__gte=settings.SHORT_JOB_MAX_DURATION, - walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, + leader_walltime__gte=settings.SHORT_JOB_MAX_DURATION, + leader_walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, ).count() long_jobs_count = Job.objects.filter( - status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, walltime__gte=settings.MEDIUM_JOB_MAX_DURATION + status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, leader_walltime__gte=settings.MEDIUM_JOB_MAX_DURATION ).count() pending_jobs_short = Job.objects.filter( - status__lt=Status.SUBMITTING, walltime__lt=settings.SHORT_JOB_MAX_DURATION + status__lt=Status.SUBMITTING, leader_walltime__lt=settings.SHORT_JOB_MAX_DURATION ).order_by("created_date")[: settings.SHORT_JOB_QUEUE - short_jobs_count] pending_jobs_medium = Job.objects.filter( status__lt=Status.SUBMITTING, - walltime__gte=settings.SHORT_JOB_MAX_DURATION, - walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, + leader_walltime__gte=settings.SHORT_JOB_MAX_DURATION, + leader_walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, ).order_by("created_date")[: settings.MEDIUM_JOB_QUEUE - medium_jobs_count] pending_jobs_long = Job.objects.filter( - status__lt=Status.SUBMITTING, walltime__gte=settings.MEDIUM_JOB_MAX_DURATION + status__lt=Status.SUBMITTING, leader_walltime__gte=settings.MEDIUM_JOB_MAX_DURATION ).order_by("created_date")[: settings.LONG_JOB_QUEUE - long_jobs_count] jobs_to_submit = [] jobs_to_submit.extend(pending_jobs_short) From b14d08751676882eb32f3030747eab52a5d695e8 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 14:32:16 -0500 Subject: [PATCH 36/64] Fixed tests --- orchestrator/tests/test_scheduler.py | 2 +- tests/test_tasks.py | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/orchestrator/tests/test_scheduler.py b/orchestrator/tests/test_scheduler.py index 8a5d9322..9c75bd41 100644 --- a/orchestrator/tests/test_scheduler.py +++ b/orchestrator/tests/test_scheduler.py @@ -12,7 +12,7 @@ def create_jobs(self, status, count, walltime): root_dir="root_dir", job_store_location="job_store_location", status=status, - walltime=walltime, + leader_walltime=walltime, ) def test_full_cluster(self): diff --git a/tests/test_tasks.py b/tests/test_tasks.py index fdce8a7e..24f1ab32 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -78,24 +78,26 @@ def test_job_args(self): app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - walltime = None + leader_walltime = None + tool_walltime = None memlimit = None inputs = {} expected_job_group = "-g {}".format(format_lsf_job_id(job_id)) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit) + jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime,tool_walltime,memlimit) job_group = " ".join(jobsubmitterObject._job_group()) self.assertEqual(job_group, expected_job_group) - def test_job_args_walltime(self): + def test_job_args_leader_walltime(self): job_id = str(uuid.uuid4()) app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - walltime = 7200 + leader_walltime = 7200 + tool_walltime = 24 memlimit = None inputs = {} - expected_job_args = "-W {}".format(walltime) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit) + expected_job_args = "-W {}".format(leader_walltime) + jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime,tool_walltime, memlimit) job_args_list = jobsubmitterObject._job_args() job_args = " ".join([str(single_arg) for single_arg in job_args_list]) self.assertEqual(job_args, expected_job_args) @@ -105,11 +107,12 @@ def test_job_args_memlimit(self): app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - walltime = None + leader_walltime = None + tool_walltime = None memlimit = 10 inputs = {} expected_job_args = "-M {}".format(memlimit) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit) + jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit) job_args_list = jobsubmitterObject._job_args() job_args = " ".join([str(single_arg) for single_arg in job_args_list]) self.assertEqual(job_args, expected_job_args) From 3c7f0024cfc246f5b5c58c3e7f1c5b375e601bdb Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 14:33:16 -0500 Subject: [PATCH 37/64] Added new tests for tool walltime --- tests/test_tasks.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 24f1ab32..22d94331 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -102,6 +102,23 @@ def test_job_args_leader_walltime(self): job_args = " ".join([str(single_arg) for single_arg in job_args_list]) self.assertEqual(job_args, expected_job_args) + def test_job_args_tool_walltime(self): + job_id = str(uuid.uuid4()) + app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} + root_dir = "test_root" + resume_jobstore = None + leader_walltime = 7200 + tool_walltime = 24 + walltime_hard = 24 + walltime_expected = 8 + memlimit = None + inputs = {} + expected_tool_args = "-We {} -W {}".format(walltime_expected, walltime_hard) + jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime,tool_walltime, memlimit) + tool_args_list = jobsubmitterObject._tool_args + tool_args = " ".join([str(single_arg) for single_arg in tool_args_list]) + self.assertEqual(tool_args, expected_tool_args) + def test_job_args_memlimit(self): job_id = str(uuid.uuid4()) app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} @@ -122,17 +139,25 @@ def test_job_args_all_options(self): app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - walltime = 7200 + leader_walltime = 7200 + tool_walltime = 24 + tool_walltime = 24 + walltime_hard = 24 + walltime_expected = 8 memlimit = 10 inputs = {} - expected_job_args = "-W {} -M {}".format(walltime, memlimit) + expected_job_args = "-W {} -M {}".format(leader_walltime, memlimit) expected_job_group = "-g {}".format(format_lsf_job_id(job_id)) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, walltime, memlimit) + expected_tool_args = "-We {} -W {}".format(walltime_expected, walltime_hard) + jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit) job_args_list = jobsubmitterObject._job_args() job_args = " ".join([str(single_arg) for single_arg in job_args_list]) job_group = " ".join(jobsubmitterObject._job_group()) + tool_args_list = jobsubmitterObject._tool_args + tool_args = " ".join([str(single_arg) for single_arg in tool_args_list]) self.assertEqual(job_args, expected_job_args) self.assertEqual(job_group, expected_job_group) + self.assertEqual(tool_args, expected_tool_args) @patch("orchestrator.tasks.command_processor.delay") @patch("orchestrator.tasks.get_job_info_path") From d68aca119389bdb2a45ecd7ecc8d2dde8b29db7c Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 14:34:05 -0500 Subject: [PATCH 38/64] Added black formatting --- .../migrations/0013_auto_20231114_1341.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/orchestrator/migrations/0013_auto_20231114_1341.py b/orchestrator/migrations/0013_auto_20231114_1341.py index 4e41f7af..083912f7 100644 --- a/orchestrator/migrations/0013_auto_20231114_1341.py +++ b/orchestrator/migrations/0013_auto_20231114_1341.py @@ -4,24 +4,23 @@ class Migration(migrations.Migration): - dependencies = [ - ('orchestrator', '0012_job_base_dir'), + ("orchestrator", "0012_job_base_dir"), ] operations = [ migrations.RemoveField( - model_name='job', - name='walltime', + model_name="job", + name="walltime", ), migrations.AddField( - model_name='job', - name='leader_walltime', + model_name="job", + name="leader_walltime", field=models.IntegerField(default=168), ), migrations.AddField( - model_name='job', - name='tool_walltime', + model_name="job", + name="tool_walltime", field=models.IntegerField(default=24), ), ] From b94c1dd466f0135106bea8625de4694a7d1c5388 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 14:35:32 -0500 Subject: [PATCH 39/64] Added flake8 fix --- submitter/factory.py | 1 - 1 file changed, 1 deletion(-) diff --git a/submitter/factory.py b/submitter/factory.py index 125e9cf5..d7d2ac24 100644 --- a/submitter/factory.py +++ b/submitter/factory.py @@ -1,4 +1,3 @@ -from django.conf import settings from orchestrator.models import PipelineType from submitter import NextflowJobSubmitter, ToilJobSubmitter From 5bcfa57d9f55e3c59027b5a1a81ca571cb7d11f7 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 15:08:41 -0500 Subject: [PATCH 40/64] Added deprecated warning for old ridgeback job --- toil_orchestrator/models.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/toil_orchestrator/models.py b/toil_orchestrator/models.py index e626e315..1969b937 100644 --- a/toil_orchestrator/models.py +++ b/toil_orchestrator/models.py @@ -1,3 +1,10 @@ +################################################################## +# # +# This Model is DEPRECATED, please see ridgeback/orchestrator # +# # +################################################################## + + import uuid from enum import IntEnum from django.db import models From 13b9b1e39912a5fb6ac7810ecfe8dfead15573cc Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 15:09:43 -0500 Subject: [PATCH 41/64] Updated container env varaibles --- container/Readme.md | 89 +++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 48 deletions(-) diff --git a/container/Readme.md b/container/Readme.md index 94e360ee..beabc523 100644 --- a/container/Readme.md +++ b/container/Readme.md @@ -17,72 +17,64 @@ Here are both the essential and optional environment variables needed by the ins ##### General - Variable | Description - :------------- |:------------- -SINGULARITYENV_SINGULARITY_PATH | The path to the singularity executable -SINGULARITYENV_RIDGEBACK_VENV | The path to the venv containing toil and other python dependencies +| Variable | Description | +| :------------------------------ | :----------------------------------------------------------------- | +| SINGULARITYENV_SINGULARITY_PATH | The path to the singularity executable | +| SINGULARITYENV_RIDGEBACK_VENV | The path to the venv containing toil and other python dependencies | -Optional Variable | Description | Default -:------------- |:------------- |:------------- -SINGULARITYENV_RIDGEBACK_PATH | The path to the ridgeback repo | /usr/bin/ridgeback (in container) -SINGULARITYENV_RIDGEBACK_PORT | The port to run the ridgeback webserver | 8000 +| Optional Variable | Description | Default | +| :---------------------------- | :-------------------------------------- | :-------------------------------- | +| SINGULARITYENV_RIDGEBACK_PATH | The path to the ridgeback repo | /usr/bin/ridgeback (in container) | +| SINGULARITYENV_RIDGEBACK_PORT | The port to run the ridgeback webserver | 8000 | ##### Rabbitmq - Variable | Description - :------------- |:------------- - SINGULARITYENV_RIDGEBACK_DEFAULT_QUEUE | Default rabbitmq queue - SINGULARITYENV_RIDGEBACK_RABBITMQ_USERNAME | rabbitmq username - SINGULARITYENV_RIDGEBACK_RABBITMQ_PASSWORD | rabbitmq password - - The default rabbitmq queue should be the same queue set in toil_orchestrator/celery.py +| Variable | Description | +| :----------------------------------------- | :--------------------- | +| SINGULARITYENV_RIDGEBACK_DEFAULT_QUEUE | Default rabbitmq queue | +| SINGULARITYENV_RIDGEBACK_RABBITMQ_USERNAME | rabbitmq username | +| SINGULARITYENV_RIDGEBACK_RABBITMQ_PASSWORD | rabbitmq password | +The default rabbitmq queue should be the same queue set in orchestrator/celery.py ##### Database -Variable | Description -:------------- |:------------- -SINGULARITYENV_RIDGEBACK_DB_NAME | Database name -SINGULARITYENV_RIDGEBACK_DB_USERNAME | Database username -SINGULARITYENV_RIDGEBACK_DB_PASSWORD | Database password -SINGULARITYENV_RIDGEBACK_DB_PORT | Database port - +| Variable | Description | +| :----------------------------------- | :---------------- | +| SINGULARITYENV_RIDGEBACK_DB_NAME | Database name | +| SINGULARITYENV_RIDGEBACK_DB_USERNAME | Database username | +| SINGULARITYENV_RIDGEBACK_DB_PASSWORD | Database password | +| SINGULARITYENV_RIDGEBACK_DB_PORT | Database port | ##### Toil -Variable | Description -:------------- |:------------- -SINGULARITYENV_RIDGEBACK_TOIL_JOB_STORE_ROOT | TOIL jobstore -SINGULARITYENV_RIDGEBACK_TOIL_WORK_DIR_ROOT | TOIL workdir -SINGULARITYENV_RIDGEBACK_TOIL_TMP_DIR_ROOT | TOIL tmp dir - +| Variable | Description | +| :------------------------------------------- | :------------ | +| SINGULARITYENV_RIDGEBACK_TOIL_JOB_STORE_ROOT | TOIL jobstore | +| SINGULARITYENV_RIDGEBACK_TOIL_WORK_DIR_ROOT | TOIL workdir | +| SINGULARITYENV_RIDGEBACK_TOIL_TMP_DIR_ROOT | TOIL tmp dir | ##### LSF -Variable | Description -:------------- |:------------- -SINGULARITYENV_LSF_LIBDIR | The path to the lsf lib dir -SINGULARITYENV_LSF_SERVERDIR | The path to the lsf etc dir -SINGULARITYENV_LSF_ENVDIR | The path to the lsf.conf -SINGULARITYENV_LSF_BINDIR | The path to the lsf bin dir -SINGULARITYENV_RIDGEBACK_LSF_SLA | Service SLA for LSF jobs - -Optional Variable | Description | Default -:------------- |:------------- |:------------- -SINGULARITYENV_RIDGEBACK_LSF_WALLTIME | walltime limit for LSF jobs | None -SINGULARITYENV_RIDGEBACK_LSF_STACKLIMIT | stacklimit for LSF | None +| Variable | Description | +| :------------------------------- | :-------------------------- | +| SINGULARITYENV_LSF_LIBDIR | The path to the lsf lib dir | +| SINGULARITYENV_LSF_SERVERDIR | The path to the lsf etc dir | +| SINGULARITYENV_LSF_ENVDIR | The path to the lsf.conf | +| SINGULARITYENV_LSF_BINDIR | The path to the lsf bin dir | +| SINGULARITYENV_RIDGEBACK_LSF_SLA | Service SLA for LSF jobs | +| Optional Variable | Description | Default | +| :-------------------------------------- | :----------------- | :------ | +| SINGULARITYENV_RIDGEBACK_LSF_STACKLIMIT | stacklimit for LSF | None | ##### Celery -Optional Variable | Description | Default -:--- | :--- | :--- -SINGULARITYENV_CELERY_LOG_PATH | Path to store the celery log files | /tmp -SINGULARITYENV_CELERY_PID_PATH | Path to store the celery pid files | /tmp -SINGULARITYENV_CELERY_BEAT_SCHEDULE_PATH | Path to store the beat schedule path | /tmp - - - +| Optional Variable | Description | Default | +| :--------------------------------------- | :----------------------------------- | :------ | +| SINGULARITYENV_CELERY_LOG_PATH | Path to store the celery log files | /tmp | +| SINGULARITYENV_CELERY_PID_PATH | Path to store the celery pid files | /tmp | +| SINGULARITYENV_CELERY_BEAT_SCHEDULE_PATH | Path to store the beat schedule path | /tmp | #### Configure singularity mount points @@ -95,6 +87,7 @@ export SINGULARITY_BIND="/juno,$SINGULARITYENV_LSF_LIBDIR,$SINGULARITYENV_LSF_SE #### Running an instance Running the following command will create a ridgeback instance named `ridgeback_service` + ``` singularity instance start ridgeback_service.sif ridgeback_service ``` From 3ee65191de04e44a031e1e8ad620542d53491484 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 15:23:24 -0500 Subject: [PATCH 42/64] Fix expected limit calculation --- submitter/toil_submitter/toil_jobsubmitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index a6a0953b..745d4ff3 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -164,7 +164,7 @@ def _leader_args(self): def _tool_args(self): args = [] if self.tool_walltime: - expected_limit = max(1, int(self.tool_walltime / 4)) + expected_limit = max(1, int(self.tool_walltime / 3)) hard_limit = self.tool_walltime args = ["-We", str(expected_limit), "-W", str(hard_limit)] args.extend(self._memlimit()) From 8914a080bf7f2bf2ec510245edf3e79b0d5c881c Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Tue, 14 Nov 2023 15:23:56 -0500 Subject: [PATCH 43/64] Fixed tests --- tests/test_tasks.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 22d94331..10e0f7e0 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -98,9 +98,9 @@ def test_job_args_leader_walltime(self): inputs = {} expected_job_args = "-W {}".format(leader_walltime) jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime,tool_walltime, memlimit) - job_args_list = jobsubmitterObject._job_args() - job_args = " ".join([str(single_arg) for single_arg in job_args_list]) - self.assertEqual(job_args, expected_job_args) + leader_args_list = jobsubmitterObject._leader_args() + leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) + self.assertEqual(leader_args, expected_job_args) def test_job_args_tool_walltime(self): job_id = str(uuid.uuid4()) @@ -115,7 +115,7 @@ def test_job_args_tool_walltime(self): inputs = {} expected_tool_args = "-We {} -W {}".format(walltime_expected, walltime_hard) jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime,tool_walltime, memlimit) - tool_args_list = jobsubmitterObject._tool_args + tool_args_list = jobsubmitterObject._tool_args() tool_args = " ".join([str(single_arg) for single_arg in tool_args_list]) self.assertEqual(tool_args, expected_tool_args) @@ -128,11 +128,11 @@ def test_job_args_memlimit(self): tool_walltime = None memlimit = 10 inputs = {} - expected_job_args = "-M {}".format(memlimit) + expected_leader_args = "-M {}".format(memlimit) jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit) - job_args_list = jobsubmitterObject._job_args() - job_args = " ".join([str(single_arg) for single_arg in job_args_list]) - self.assertEqual(job_args, expected_job_args) + leader_args_list = jobsubmitterObject._leader_args() + leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) + self.assertEqual(leader_args, expected_leader_args) def test_job_args_all_options(self): job_id = str(uuid.uuid4()) @@ -146,16 +146,16 @@ def test_job_args_all_options(self): walltime_expected = 8 memlimit = 10 inputs = {} - expected_job_args = "-W {} -M {}".format(leader_walltime, memlimit) + expected_leader_args = "-W {} -M {}".format(leader_walltime, memlimit) expected_job_group = "-g {}".format(format_lsf_job_id(job_id)) - expected_tool_args = "-We {} -W {}".format(walltime_expected, walltime_hard) + expected_tool_args = "-We {} -W {} -M {}".format(walltime_expected, walltime_hard, memlimit) jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit) - job_args_list = jobsubmitterObject._job_args() - job_args = " ".join([str(single_arg) for single_arg in job_args_list]) + leader_args_list = jobsubmitterObject._leader_args() + leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) job_group = " ".join(jobsubmitterObject._job_group()) - tool_args_list = jobsubmitterObject._tool_args + tool_args_list = jobsubmitterObject._tool_args() tool_args = " ".join([str(single_arg) for single_arg in tool_args_list]) - self.assertEqual(job_args, expected_job_args) + self.assertEqual(leader_args, expected_leader_args) self.assertEqual(job_group, expected_job_group) self.assertEqual(tool_args, expected_tool_args) From 632af2414c1a474dbe5900c407371f7c84518d65 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 10:49:04 -0500 Subject: [PATCH 44/64] Fix orchestrator tests --- orchestrator/tests/test_scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestrator/tests/test_scheduler.py b/orchestrator/tests/test_scheduler.py index 9c75bd41..0d8f05b8 100644 --- a/orchestrator/tests/test_scheduler.py +++ b/orchestrator/tests/test_scheduler.py @@ -35,7 +35,7 @@ def test_long_jobs_full(self): jobs = Scheduler.get_jobs_to_submit() self.assertEqual(len(jobs), 4) for job in jobs: - self.assertTrue(job.walltime in (5500, 2000)) + self.assertTrue(job.leader_walltime in (5500, 2000)) self.assertEqual(job.status, Status.CREATED) def test_partial(self): @@ -47,5 +47,5 @@ def test_partial(self): jobs = Scheduler.get_jobs_to_submit() self.assertEqual(len(jobs), 32) for job in jobs: - self.assertTrue(job.walltime in (5500, 2000)) + self.assertTrue(job.leader_walltime in (5500, 2000)) self.assertEqual(job.status, Status.CREATED) From f564ee650eae67a83d27d1293a8beffa5648ec5f Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 10:49:45 -0500 Subject: [PATCH 45/64] Added black formatting --- orchestrator/scheduler/scheduler.py | 8 ++++++-- tests/test_tasks.py | 20 +++++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/orchestrator/scheduler/scheduler.py b/orchestrator/scheduler/scheduler.py index d68a40dd..7e71c511 100644 --- a/orchestrator/scheduler/scheduler.py +++ b/orchestrator/scheduler/scheduler.py @@ -19,7 +19,9 @@ def get_jobs_to_submit(): LONG: 7200 <= walltime """ short_jobs_count = Job.objects.filter( - status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, leader_walltime__lt=settings.SHORT_JOB_MAX_DURATION + status__gte=Status.SUBMITTING, + status__lt=Status.COMPLETED, + leader_walltime__lt=settings.SHORT_JOB_MAX_DURATION, ).count() medium_jobs_count = Job.objects.filter( status__gte=Status.SUBMITTING, @@ -28,7 +30,9 @@ def get_jobs_to_submit(): leader_walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, ).count() long_jobs_count = Job.objects.filter( - status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, leader_walltime__gte=settings.MEDIUM_JOB_MAX_DURATION + status__gte=Status.SUBMITTING, + status__lt=Status.COMPLETED, + leader_walltime__gte=settings.MEDIUM_JOB_MAX_DURATION, ).count() pending_jobs_short = Job.objects.filter( status__lt=Status.SUBMITTING, leader_walltime__lt=settings.SHORT_JOB_MAX_DURATION diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 10e0f7e0..1c7bbeda 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -83,7 +83,9 @@ def test_job_args(self): memlimit = None inputs = {} expected_job_group = "-g {}".format(format_lsf_job_id(job_id)) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime,tool_walltime,memlimit) + jobsubmitterObject = ToilJobSubmitter( + job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + ) job_group = " ".join(jobsubmitterObject._job_group()) self.assertEqual(job_group, expected_job_group) @@ -97,7 +99,9 @@ def test_job_args_leader_walltime(self): memlimit = None inputs = {} expected_job_args = "-W {}".format(leader_walltime) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime,tool_walltime, memlimit) + jobsubmitterObject = ToilJobSubmitter( + job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + ) leader_args_list = jobsubmitterObject._leader_args() leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) self.assertEqual(leader_args, expected_job_args) @@ -114,7 +118,9 @@ def test_job_args_tool_walltime(self): memlimit = None inputs = {} expected_tool_args = "-We {} -W {}".format(walltime_expected, walltime_hard) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime,tool_walltime, memlimit) + jobsubmitterObject = ToilJobSubmitter( + job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + ) tool_args_list = jobsubmitterObject._tool_args() tool_args = " ".join([str(single_arg) for single_arg in tool_args_list]) self.assertEqual(tool_args, expected_tool_args) @@ -129,7 +135,9 @@ def test_job_args_memlimit(self): memlimit = 10 inputs = {} expected_leader_args = "-M {}".format(memlimit) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit) + jobsubmitterObject = ToilJobSubmitter( + job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + ) leader_args_list = jobsubmitterObject._leader_args() leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) self.assertEqual(leader_args, expected_leader_args) @@ -149,7 +157,9 @@ def test_job_args_all_options(self): expected_leader_args = "-W {} -M {}".format(leader_walltime, memlimit) expected_job_group = "-g {}".format(format_lsf_job_id(job_id)) expected_tool_args = "-We {} -W {} -M {}".format(walltime_expected, walltime_hard, memlimit) - jobsubmitterObject = ToilJobSubmitter(job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit) + jobsubmitterObject = ToilJobSubmitter( + job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + ) leader_args_list = jobsubmitterObject._leader_args() leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) job_group = " ".join(jobsubmitterObject._job_group()) From 058b21b9710589d9887f9471cb512369edf064df Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 11:21:50 -0500 Subject: [PATCH 46/64] Fix default walltime --- orchestrator/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestrator/models.py b/orchestrator/models.py index 4fa5351b..0abb0144 100644 --- a/orchestrator/models.py +++ b/orchestrator/models.py @@ -173,8 +173,8 @@ class Job(BaseModel): submitted = models.DateTimeField(blank=True, null=True) finished = models.DateTimeField(blank=True, null=True) track_cache = JSONField(blank=True, null=True) - leader_walltime = models.IntegerField(default=168) - tool_walltime = models.IntegerField(default=24) + leader_walltime = models.IntegerField(default=7200) + tool_walltime = models.IntegerField(default=1440) memlimit = models.CharField(blank=True, null=True, default=None, max_length=20) metadata = JSONField(blank=True, null=True, default=dict) From cec3d051a61d75084a93d88576da0ed96e73a690 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 12:53:18 -0500 Subject: [PATCH 47/64] Added migrations to rename walltime field --- .../{0013_auto_20231114_1341.py => 0013_add_walltime.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename orchestrator/migrations/{0013_auto_20231114_1341.py => 0013_add_walltime.py} (100%) diff --git a/orchestrator/migrations/0013_auto_20231114_1341.py b/orchestrator/migrations/0013_add_walltime.py similarity index 100% rename from orchestrator/migrations/0013_auto_20231114_1341.py rename to orchestrator/migrations/0013_add_walltime.py From 2ae54bb8beb42cd761fbf57c63befb44686986ad Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 12:53:46 -0500 Subject: [PATCH 48/64] Added default walltime value migration --- .../migrations/0014_auto_20231116_1252.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 orchestrator/migrations/0014_auto_20231116_1252.py diff --git a/orchestrator/migrations/0014_auto_20231116_1252.py b/orchestrator/migrations/0014_auto_20231116_1252.py new file mode 100644 index 00000000..b21099b1 --- /dev/null +++ b/orchestrator/migrations/0014_auto_20231116_1252.py @@ -0,0 +1,23 @@ +# Generated by Django 2.2.24 on 2023-11-16 17:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('orchestrator', '0013_add_walltime'), + ] + + operations = [ + migrations.AlterField( + model_name='job', + name='leader_walltime', + field=models.IntegerField(default=7200), + ), + migrations.AlterField( + model_name='job', + name='tool_walltime', + field=models.IntegerField(default=1440), + ), + ] From edfc069f26aaeb7b24620b798c827de5caedbe4a Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 15:48:36 -0500 Subject: [PATCH 49/64] Revert leader_walltime to walltime --- orchestrator/migrations/0013_add_walltime.py | 26 ------------------- .../migrations/0014_auto_20231116_1252.py | 23 ---------------- orchestrator/models.py | 2 +- orchestrator/scheduler/scheduler.py | 16 ++++++------ orchestrator/tasks.py | 2 +- orchestrator/tests/test_scheduler.py | 6 ++--- submitter/factory.py | 6 ++--- submitter/jobsubmitter.py | 4 +-- .../nextflow_jobsubmitter.py | 6 ++--- submitter/toil_submitter/toil_jobsubmitter.py | 6 ++--- tests/test_tasks.py | 26 +++++++++---------- 11 files changed, 37 insertions(+), 86 deletions(-) delete mode 100644 orchestrator/migrations/0013_add_walltime.py delete mode 100644 orchestrator/migrations/0014_auto_20231116_1252.py diff --git a/orchestrator/migrations/0013_add_walltime.py b/orchestrator/migrations/0013_add_walltime.py deleted file mode 100644 index 083912f7..00000000 --- a/orchestrator/migrations/0013_add_walltime.py +++ /dev/null @@ -1,26 +0,0 @@ -# Generated by Django 2.2.24 on 2023-11-14 18:41 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("orchestrator", "0012_job_base_dir"), - ] - - operations = [ - migrations.RemoveField( - model_name="job", - name="walltime", - ), - migrations.AddField( - model_name="job", - name="leader_walltime", - field=models.IntegerField(default=168), - ), - migrations.AddField( - model_name="job", - name="tool_walltime", - field=models.IntegerField(default=24), - ), - ] diff --git a/orchestrator/migrations/0014_auto_20231116_1252.py b/orchestrator/migrations/0014_auto_20231116_1252.py deleted file mode 100644 index b21099b1..00000000 --- a/orchestrator/migrations/0014_auto_20231116_1252.py +++ /dev/null @@ -1,23 +0,0 @@ -# Generated by Django 2.2.24 on 2023-11-16 17:52 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('orchestrator', '0013_add_walltime'), - ] - - operations = [ - migrations.AlterField( - model_name='job', - name='leader_walltime', - field=models.IntegerField(default=7200), - ), - migrations.AlterField( - model_name='job', - name='tool_walltime', - field=models.IntegerField(default=1440), - ), - ] diff --git a/orchestrator/models.py b/orchestrator/models.py index 0abb0144..6cd0fe38 100644 --- a/orchestrator/models.py +++ b/orchestrator/models.py @@ -173,7 +173,7 @@ class Job(BaseModel): submitted = models.DateTimeField(blank=True, null=True) finished = models.DateTimeField(blank=True, null=True) track_cache = JSONField(blank=True, null=True) - leader_walltime = models.IntegerField(default=7200) + walltime = models.IntegerField(default=7200) tool_walltime = models.IntegerField(default=1440) memlimit = models.CharField(blank=True, null=True, default=None, max_length=20) metadata = JSONField(blank=True, null=True, default=dict) diff --git a/orchestrator/scheduler/scheduler.py b/orchestrator/scheduler/scheduler.py index 7e71c511..7b966490 100644 --- a/orchestrator/scheduler/scheduler.py +++ b/orchestrator/scheduler/scheduler.py @@ -21,29 +21,29 @@ def get_jobs_to_submit(): short_jobs_count = Job.objects.filter( status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, - leader_walltime__lt=settings.SHORT_JOB_MAX_DURATION, + walltime__lt=settings.SHORT_JOB_MAX_DURATION, ).count() medium_jobs_count = Job.objects.filter( status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, - leader_walltime__gte=settings.SHORT_JOB_MAX_DURATION, - leader_walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, + walltime__gte=settings.SHORT_JOB_MAX_DURATION, + walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, ).count() long_jobs_count = Job.objects.filter( status__gte=Status.SUBMITTING, status__lt=Status.COMPLETED, - leader_walltime__gte=settings.MEDIUM_JOB_MAX_DURATION, + walltime__gte=settings.MEDIUM_JOB_MAX_DURATION, ).count() pending_jobs_short = Job.objects.filter( - status__lt=Status.SUBMITTING, leader_walltime__lt=settings.SHORT_JOB_MAX_DURATION + status__lt=Status.SUBMITTING, walltime__lt=settings.SHORT_JOB_MAX_DURATION ).order_by("created_date")[: settings.SHORT_JOB_QUEUE - short_jobs_count] pending_jobs_medium = Job.objects.filter( status__lt=Status.SUBMITTING, - leader_walltime__gte=settings.SHORT_JOB_MAX_DURATION, - leader_walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, + walltime__gte=settings.SHORT_JOB_MAX_DURATION, + walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, ).order_by("created_date")[: settings.MEDIUM_JOB_QUEUE - medium_jobs_count] pending_jobs_long = Job.objects.filter( - status__lt=Status.SUBMITTING, leader_walltime__gte=settings.MEDIUM_JOB_MAX_DURATION + status__lt=Status.SUBMITTING, walltime__gte=settings.MEDIUM_JOB_MAX_DURATION ).order_by("created_date")[: settings.LONG_JOB_QUEUE - long_jobs_count] jobs_to_submit = [] jobs_to_submit.extend(pending_jobs_short) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index bdf8e053..1a510a5c 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -157,7 +157,7 @@ def submit_job_to_lsf(job): job.inputs, job.root_dir, job.resume_job_store_location, - job.leader_walltime, + job.walltime, job.tool_walltime, job.memlimit, log_dir=job.log_dir, diff --git a/orchestrator/tests/test_scheduler.py b/orchestrator/tests/test_scheduler.py index 0d8f05b8..8a5d9322 100644 --- a/orchestrator/tests/test_scheduler.py +++ b/orchestrator/tests/test_scheduler.py @@ -12,7 +12,7 @@ def create_jobs(self, status, count, walltime): root_dir="root_dir", job_store_location="job_store_location", status=status, - leader_walltime=walltime, + walltime=walltime, ) def test_full_cluster(self): @@ -35,7 +35,7 @@ def test_long_jobs_full(self): jobs = Scheduler.get_jobs_to_submit() self.assertEqual(len(jobs), 4) for job in jobs: - self.assertTrue(job.leader_walltime in (5500, 2000)) + self.assertTrue(job.walltime in (5500, 2000)) self.assertEqual(job.status, Status.CREATED) def test_partial(self): @@ -47,5 +47,5 @@ def test_partial(self): jobs = Scheduler.get_jobs_to_submit() self.assertEqual(len(jobs), 32) for job in jobs: - self.assertTrue(job.leader_walltime in (5500, 2000)) + self.assertTrue(job.walltime in (5500, 2000)) self.assertEqual(job.status, Status.CREATED) diff --git a/submitter/factory.py b/submitter/factory.py index d7d2ac24..f7ad6232 100644 --- a/submitter/factory.py +++ b/submitter/factory.py @@ -11,16 +11,16 @@ def factory( inputs, root_dir, resume_jobstore=None, - leader_walltime=None, + walltime=None, tool_walltime=None, memlimit=None, log_dir=None, ): if type == PipelineType.CWL: return ToilJobSubmitter( - job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir + job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit, log_dir ) elif type == PipelineType.NEXTFLOW: return NextflowJobSubmitter( - job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir + job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit, log_dir ) diff --git a/submitter/jobsubmitter.py b/submitter/jobsubmitter.py index 3ef8208c..116b9b1e 100644 --- a/submitter/jobsubmitter.py +++ b/submitter/jobsubmitter.py @@ -3,12 +3,12 @@ class JobSubmitter(object): - def __init__(self, job_id, app, inputs, leader_walltime, tool_walltime, memlimit, log_dir=None): + def __init__(self, job_id, app, inputs, walltime, tool_walltime, memlimit, log_dir=None): self.app = App.factory(app) self.job_id = job_id self.inputs = inputs self.lsf_client = LSFClient() - self.leader_walltime = leader_walltime + self.walltime = walltime self.tool_walltime = tool_walltime self.memlimit = memlimit self.log_dir = log_dir diff --git a/submitter/nextflow_submitter/nextflow_jobsubmitter.py b/submitter/nextflow_submitter/nextflow_jobsubmitter.py index 05842367..5338bc90 100644 --- a/submitter/nextflow_submitter/nextflow_jobsubmitter.py +++ b/submitter/nextflow_submitter/nextflow_jobsubmitter.py @@ -7,7 +7,7 @@ class NextflowJobSubmitter(JobSubmitter): def __init__( - self, job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir=None + self, job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit, log_dir=None ): """ :param job_id: @@ -34,7 +34,7 @@ def __init__( :param root_dir: :param resume_jobstore: """ - JobSubmitter.__init__(self, job_id, app, inputs, leader_walltime, tool_walltime, memlimit, log_dir) + JobSubmitter.__init__(self, job_id, app, inputs, walltime, tool_walltime, memlimit, log_dir) self.resume_jobstore = resume_jobstore if resume_jobstore: self.job_store_dir = resume_jobstore @@ -62,7 +62,7 @@ def _leader_args(self): return args def _walltime(self): - return ["-W", str(self.leader_walltime)] if self.leader_walltime else [] + return ["-W", str(self.walltime)] if self.walltime else [] def _memlimit(self): return ["-M", self.memlimit] if self.memlimit else ["-M", "20"] diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index 745d4ff3..d5b2b7af 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -26,9 +26,9 @@ def translate_toil_to_model_status(status): class ToilJobSubmitter(JobSubmitter): def __init__( - self, job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit, log_dir=None + self, job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit, log_dir=None ): - JobSubmitter.__init__(self, job_id, app, inputs, leader_walltime, tool_walltime, memlimit, log_dir) + JobSubmitter.__init__(self, job_id, app, inputs, walltime, tool_walltime, memlimit, log_dir) self.resume_jobstore = resume_jobstore if resume_jobstore: self.job_store_dir = resume_jobstore @@ -171,7 +171,7 @@ def _tool_args(self): return args def _walltime(self): - return ["-W", str(self.leader_walltime)] if self.leader_walltime else [] + return ["-W", str(self.walltime)] if self.walltime else [] def _memlimit(self): return ["-M", self.memlimit] if self.memlimit else [] diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 1c7bbeda..baf01d12 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -78,29 +78,29 @@ def test_job_args(self): app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - leader_walltime = None + walltime = None tool_walltime = None memlimit = None inputs = {} expected_job_group = "-g {}".format(format_lsf_job_id(job_id)) jobsubmitterObject = ToilJobSubmitter( - job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit ) job_group = " ".join(jobsubmitterObject._job_group()) self.assertEqual(job_group, expected_job_group) - def test_job_args_leader_walltime(self): + def test_job_args_walltime(self): job_id = str(uuid.uuid4()) app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - leader_walltime = 7200 + walltime = 7200 tool_walltime = 24 memlimit = None inputs = {} - expected_job_args = "-W {}".format(leader_walltime) + expected_job_args = "-W {}".format(walltime) jobsubmitterObject = ToilJobSubmitter( - job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit ) leader_args_list = jobsubmitterObject._leader_args() leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) @@ -111,7 +111,7 @@ def test_job_args_tool_walltime(self): app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - leader_walltime = 7200 + walltime = 7200 tool_walltime = 24 walltime_hard = 24 walltime_expected = 8 @@ -119,7 +119,7 @@ def test_job_args_tool_walltime(self): inputs = {} expected_tool_args = "-We {} -W {}".format(walltime_expected, walltime_hard) jobsubmitterObject = ToilJobSubmitter( - job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit ) tool_args_list = jobsubmitterObject._tool_args() tool_args = " ".join([str(single_arg) for single_arg in tool_args_list]) @@ -130,13 +130,13 @@ def test_job_args_memlimit(self): app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - leader_walltime = None + walltime = None tool_walltime = None memlimit = 10 inputs = {} expected_leader_args = "-M {}".format(memlimit) jobsubmitterObject = ToilJobSubmitter( - job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit ) leader_args_list = jobsubmitterObject._leader_args() leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) @@ -147,18 +147,18 @@ def test_job_args_all_options(self): app = {"github": {"repository": "awesome_repo", "entrypoint": "test.cwl"}} root_dir = "test_root" resume_jobstore = None - leader_walltime = 7200 + walltime = 7200 tool_walltime = 24 tool_walltime = 24 walltime_hard = 24 walltime_expected = 8 memlimit = 10 inputs = {} - expected_leader_args = "-W {} -M {}".format(leader_walltime, memlimit) + expected_leader_args = "-W {} -M {}".format(walltime, memlimit) expected_job_group = "-g {}".format(format_lsf_job_id(job_id)) expected_tool_args = "-We {} -W {} -M {}".format(walltime_expected, walltime_hard, memlimit) jobsubmitterObject = ToilJobSubmitter( - job_id, app, inputs, root_dir, resume_jobstore, leader_walltime, tool_walltime, memlimit + job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit ) leader_args_list = jobsubmitterObject._leader_args() leader_args = " ".join([str(single_arg) for single_arg in leader_args_list]) From 0ef72f1d3e20dcaa6084244c39f64038b743de78 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 15:49:14 -0500 Subject: [PATCH 50/64] Added simpler migration --- .../migrations/0013_auto_20231116_1546.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 orchestrator/migrations/0013_auto_20231116_1546.py diff --git a/orchestrator/migrations/0013_auto_20231116_1546.py b/orchestrator/migrations/0013_auto_20231116_1546.py new file mode 100644 index 00000000..eb5ed79d --- /dev/null +++ b/orchestrator/migrations/0013_auto_20231116_1546.py @@ -0,0 +1,23 @@ +# Generated by Django 2.2.24 on 2023-11-16 20:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('orchestrator', '0012_job_base_dir'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='tool_walltime', + field=models.IntegerField(default=1440), + ), + migrations.AlterField( + model_name='job', + name='walltime', + field=models.IntegerField(default=7200), + ), + ] From 9af68433a490f4a9d3129ca25332d33e05c74f35 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 15:49:53 -0500 Subject: [PATCH 51/64] Added black formatting --- submitter/nextflow_submitter/nextflow_jobsubmitter.py | 4 +--- submitter/toil_submitter/toil_jobsubmitter.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/submitter/nextflow_submitter/nextflow_jobsubmitter.py b/submitter/nextflow_submitter/nextflow_jobsubmitter.py index 5338bc90..6700c8b0 100644 --- a/submitter/nextflow_submitter/nextflow_jobsubmitter.py +++ b/submitter/nextflow_submitter/nextflow_jobsubmitter.py @@ -6,9 +6,7 @@ class NextflowJobSubmitter(JobSubmitter): - def __init__( - self, job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit, log_dir=None - ): + def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit, log_dir=None): """ :param job_id: :param app: github.url diff --git a/submitter/toil_submitter/toil_jobsubmitter.py b/submitter/toil_submitter/toil_jobsubmitter.py index d5b2b7af..e56f4434 100644 --- a/submitter/toil_submitter/toil_jobsubmitter.py +++ b/submitter/toil_submitter/toil_jobsubmitter.py @@ -25,9 +25,7 @@ def translate_toil_to_model_status(status): class ToilJobSubmitter(JobSubmitter): - def __init__( - self, job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit, log_dir=None - ): + def __init__(self, job_id, app, inputs, root_dir, resume_jobstore, walltime, tool_walltime, memlimit, log_dir=None): JobSubmitter.__init__(self, job_id, app, inputs, walltime, tool_walltime, memlimit, log_dir) self.resume_jobstore = resume_jobstore if resume_jobstore: From 642f777854728a5f7da930fac276868c53d6b9cb Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Thu, 16 Nov 2023 15:55:56 -0500 Subject: [PATCH 52/64] Prevent submission lockup --- orchestrator/scheduler/scheduler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/orchestrator/scheduler/scheduler.py b/orchestrator/scheduler/scheduler.py index 7b966490..8371537f 100644 --- a/orchestrator/scheduler/scheduler.py +++ b/orchestrator/scheduler/scheduler.py @@ -36,15 +36,15 @@ def get_jobs_to_submit(): ).count() pending_jobs_short = Job.objects.filter( status__lt=Status.SUBMITTING, walltime__lt=settings.SHORT_JOB_MAX_DURATION - ).order_by("created_date")[: settings.SHORT_JOB_QUEUE - short_jobs_count] + ).order_by("created_date")[: max(settings.SHORT_JOB_QUEUE - short_jobs_count,0)] pending_jobs_medium = Job.objects.filter( status__lt=Status.SUBMITTING, walltime__gte=settings.SHORT_JOB_MAX_DURATION, walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, - ).order_by("created_date")[: settings.MEDIUM_JOB_QUEUE - medium_jobs_count] + ).order_by("created_date")[: max(settings.MEDIUM_JOB_QUEUE - medium_jobs_count,0)] pending_jobs_long = Job.objects.filter( status__lt=Status.SUBMITTING, walltime__gte=settings.MEDIUM_JOB_MAX_DURATION - ).order_by("created_date")[: settings.LONG_JOB_QUEUE - long_jobs_count] + ).order_by("created_date")[: max(settings.LONG_JOB_QUEUE - long_jobs_count,0)] jobs_to_submit = [] jobs_to_submit.extend(pending_jobs_short) jobs_to_submit.extend(pending_jobs_medium) From 7ab26b113fc87071d8390cce797352e8606de8e7 Mon Sep 17 00:00:00 2001 From: Ivkovic Date: Thu, 16 Nov 2023 18:47:23 -0500 Subject: [PATCH 53/64] Fix checking hanging jobs --- orchestrator/tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index cb93a496..3b1b9d9a 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -96,8 +96,6 @@ def process_jobs(): for job_id in status_jobs: # Send CHECK_STATUS commands for Jobs command_processor.delay(Command(CommandType.CHECK_STATUS_ON_LSF, str(job_id)).to_dict()) - if status_jobs.status == Status.RUNNING: - command_processor.delay(Command(CommandType.CHECK_HANGING, str(job_id)).to_dict()) jobs = Scheduler.get_jobs_to_submit() @@ -252,6 +250,9 @@ def check_job_status(job): elif lsf_status in (Status.FAILED,): _fail(job, lsf_message) + elif lsf_status in (Status.RUNNING,): + command_processor.delay(Command(CommandType.CHECK_HANGING, str(job.id)).to_dict()) + command_processor.delay(Command(CommandType.CHECK_COMMAND_LINE_STATUS, str(job.id)).to_dict()) else: From 75dfefc520941974d47ebf5b99ba316153c8d525 Mon Sep 17 00:00:00 2001 From: Ivkovic Date: Fri, 17 Nov 2023 08:54:32 -0500 Subject: [PATCH 54/64] Fix check hanging job --- orchestrator/tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 3b1b9d9a..c483f04e 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -239,6 +239,9 @@ def check_job_status(job): Status.UNKNOWN, ): job.update_status(lsf_status) + + if lsf_status in (Status.RUNNING,): + command_processor.delay(Command(CommandType.CHECK_HANGING, str(job.id)).to_dict()) elif lsf_status in (Status.COMPLETED,): outputs, error_message = submiter.get_outputs() @@ -250,9 +253,6 @@ def check_job_status(job): elif lsf_status in (Status.FAILED,): _fail(job, lsf_message) - elif lsf_status in (Status.RUNNING,): - command_processor.delay(Command(CommandType.CHECK_HANGING, str(job.id)).to_dict()) - command_processor.delay(Command(CommandType.CHECK_COMMAND_LINE_STATUS, str(job.id)).to_dict()) else: From d3b64fe06614b9310e6e5ac765a24e6360bec9dd Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Fri, 17 Nov 2023 10:05:53 -0500 Subject: [PATCH 55/64] Updated container file --- container/ridgeback_service.def | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/container/ridgeback_service.def b/container/ridgeback_service.def index da6210fb..ea8d568a 100644 --- a/container/ridgeback_service.def +++ b/container/ridgeback_service.def @@ -124,7 +124,7 @@ Includecmd: no echo "ERROR: SINGULARITYENV_LSF_SERVERDIR is not set." exit 1 fi - - python3 ${RIDGEBACK_PATH}/manage.py migrate - yes | python3 ${RIDGEBACK_PATH}/manage.py collectstatic + + python3 ${RIDGEBACK_PATH}/manage.py migrate --noinput + python3 ${RIDGEBACK_PATH}/manage.py collectstatic --noinput python3 ${RIDGEBACK_PATH}/manage.py runserver 0.0.0.0:$RIDGEBACK_PORT > /dev/null 2>&1 < /dev/null & From 5afe6d5c40563173dcbc845474337d949d42655e Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Mon, 20 Nov 2023 12:14:54 -0500 Subject: [PATCH 56/64] Fixed commandline filtering parameters --- orchestrator/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index cb93a496..043144af 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -295,8 +295,8 @@ def check_leader_not_running(self): def check_job_hanging(single_running_job): time_threshold = now() - timedelta(hours=int(settings.MAX_HANGING_HOURS)) - non_running_tools = CommandLineToolJob.objects.filter(modified_date__lt=time_threshold).exclude( - status__in=[Status.COMPLETED, Status.TERMINATED, Status.RUNNING] + non_running_tools = CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, modified_date__lt=time_threshold).exclude( + status__in=[Status.COMPLETED, Status.TERMINATED, Status.RUNNING, Status.FAILED] ) running_tools = CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, status=Status.RUNNING) if len(running_tools) == 0: From 156f320116458eda79dc685e5f556ef009963f20 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Mon, 20 Nov 2023 12:17:33 -0500 Subject: [PATCH 57/64] Added black formatting --- orchestrator/tasks.py | 6 +++--- ridgeback/settings.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 043144af..67c2417a 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -295,9 +295,9 @@ def check_leader_not_running(self): def check_job_hanging(single_running_job): time_threshold = now() - timedelta(hours=int(settings.MAX_HANGING_HOURS)) - non_running_tools = CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, modified_date__lt=time_threshold).exclude( - status__in=[Status.COMPLETED, Status.TERMINATED, Status.RUNNING, Status.FAILED] - ) + non_running_tools = CommandLineToolJob.objects.filter( + root__id__exact=single_running_job.id, modified_date__lt=time_threshold + ).exclude(status__in=[Status.COMPLETED, Status.TERMINATED, Status.RUNNING, Status.FAILED]) running_tools = CommandLineToolJob.objects.filter(root__id__exact=single_running_job.id, status=Status.RUNNING) if len(running_tools) == 0: completed_jobs_finished_time = list( diff --git a/ridgeback/settings.py b/ridgeback/settings.py index 766d2dcf..7b37d76f 100644 --- a/ridgeback/settings.py +++ b/ridgeback/settings.py @@ -275,4 +275,4 @@ APP_CACHE = os.environ.get("RIDGEBACK_APP_CACHE", "/tmp") -MAX_HANGING_HOURS = os.environ.get("RIDGEBACK_MAX_HANGING_HOURS", "5") \ No newline at end of file +MAX_HANGING_HOURS = os.environ.get("RIDGEBACK_MAX_HANGING_HOURS", "5") From d112d40ef14f765d2b1d53a0bffd258222dcbe67 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Wed, 22 Nov 2023 13:37:35 -0500 Subject: [PATCH 58/64] Removed unnecessary env --- container/ridgeback_service.def | 5 ----- 1 file changed, 5 deletions(-) diff --git a/container/ridgeback_service.def b/container/ridgeback_service.def index ea8d568a..d75974ae 100644 --- a/container/ridgeback_service.def +++ b/container/ridgeback_service.def @@ -100,11 +100,6 @@ Includecmd: no exit 1 fi - if [ -z "$RIDGEBACK_LSF_SLA" ]; then - echo "ERROR: SINGULARITYENV_RIDGEBACK_LSF_SLA not set." - exit 1 - fi - if [ -z "$LSF_ENVDIR" ]; then echo "ERROR: SINGULARITYENV_LSF_ENVDIR is not set." exit 1 From 1623ce131395353872628c5ce128d2786d6040ce Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Wed, 22 Nov 2023 13:57:51 -0500 Subject: [PATCH 59/64] Removed another unused env --- container/ridgeback_service.def | 5 ----- 1 file changed, 5 deletions(-) diff --git a/container/ridgeback_service.def b/container/ridgeback_service.def index d75974ae..30735fb6 100644 --- a/container/ridgeback_service.def +++ b/container/ridgeback_service.def @@ -95,11 +95,6 @@ Includecmd: no fi ### LSF Parameters to communicate with LSF - if [ -z "$RIDGEBACK_LSF_WALLTIME" ]; then - echo "ERROR: SINGULARITYENV_RIDGEBACK_LSF_WALLTIME not set." - exit 1 - fi - if [ -z "$LSF_ENVDIR" ]; then echo "ERROR: SINGULARITYENV_LSF_ENVDIR is not set." exit 1 From 925e196fd2dd029d274ab11d7c609fc37bd81de3 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Fri, 24 Nov 2023 14:43:32 -0500 Subject: [PATCH 60/64] Add black migration --- orchestrator/migrations/0013_auto_20231116_1546.py | 10 +++++----- orchestrator/scheduler/scheduler.py | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/orchestrator/migrations/0013_auto_20231116_1546.py b/orchestrator/migrations/0013_auto_20231116_1546.py index eb5ed79d..66e130f7 100644 --- a/orchestrator/migrations/0013_auto_20231116_1546.py +++ b/orchestrator/migrations/0013_auto_20231116_1546.py @@ -6,18 +6,18 @@ class Migration(migrations.Migration): dependencies = [ - ('orchestrator', '0012_job_base_dir'), + ("orchestrator", "0012_job_base_dir"), ] operations = [ migrations.AddField( - model_name='job', - name='tool_walltime', + model_name="job", + name="tool_walltime", field=models.IntegerField(default=1440), ), migrations.AlterField( - model_name='job', - name='walltime', + model_name="job", + name="walltime", field=models.IntegerField(default=7200), ), ] diff --git a/orchestrator/scheduler/scheduler.py b/orchestrator/scheduler/scheduler.py index 8371537f..3e35a1db 100644 --- a/orchestrator/scheduler/scheduler.py +++ b/orchestrator/scheduler/scheduler.py @@ -36,15 +36,15 @@ def get_jobs_to_submit(): ).count() pending_jobs_short = Job.objects.filter( status__lt=Status.SUBMITTING, walltime__lt=settings.SHORT_JOB_MAX_DURATION - ).order_by("created_date")[: max(settings.SHORT_JOB_QUEUE - short_jobs_count,0)] + ).order_by("created_date")[: max(settings.SHORT_JOB_QUEUE - short_jobs_count, 0)] pending_jobs_medium = Job.objects.filter( status__lt=Status.SUBMITTING, walltime__gte=settings.SHORT_JOB_MAX_DURATION, walltime__lt=settings.MEDIUM_JOB_MAX_DURATION, - ).order_by("created_date")[: max(settings.MEDIUM_JOB_QUEUE - medium_jobs_count,0)] + ).order_by("created_date")[: max(settings.MEDIUM_JOB_QUEUE - medium_jobs_count, 0)] pending_jobs_long = Job.objects.filter( status__lt=Status.SUBMITTING, walltime__gte=settings.MEDIUM_JOB_MAX_DURATION - ).order_by("created_date")[: max(settings.LONG_JOB_QUEUE - long_jobs_count,0)] + ).order_by("created_date")[: max(settings.LONG_JOB_QUEUE - long_jobs_count, 0)] jobs_to_submit = [] jobs_to_submit.extend(pending_jobs_short) jobs_to_submit.extend(pending_jobs_medium) From 706a63277b95ba7f14c068d560c044f138d45834 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Fri, 24 Nov 2023 18:11:08 -0500 Subject: [PATCH 61/64] Fixed process test --- orchestrator/tests/test_tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/orchestrator/tests/test_tasks.py b/orchestrator/tests/test_tasks.py index 2fb1b4b8..11021748 100644 --- a/orchestrator/tests/test_tasks.py +++ b/orchestrator/tests/test_tasks.py @@ -237,7 +237,8 @@ def test_failed(self, status, command_processor): @patch("django.core.cache.cache.delete") @patch("django.core.cache.cache.add") @patch("orchestrator.tasks.command_processor.delay") - def test_process_jobs(self, command_processor, add, delete): + @patch("orchestrator.tasks.check_leader_not_running.delay") + def test_process_jobs(self, check_leader_not_running, command_processor, add, delete): job_pending_1 = Job.objects.create( type=PipelineType.CWL, app={ @@ -272,6 +273,7 @@ def test_process_jobs(self, command_processor, add, delete): ] command_processor.assert_has_calls(calls, any_order=True) + check_leader_not_running.assert_called_once() @patch("django.core.cache.cache.delete") @patch("django.core.cache.cache.add") From f4e94b84bf629c2e357f724213b339f81585aea5 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Fri, 24 Nov 2023 18:33:38 -0500 Subject: [PATCH 62/64] Add flake and black fixes --- orchestrator/tasks.py | 2 +- tests/test_commandline.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestrator/tasks.py b/orchestrator/tasks.py index 4f415f15..6593ac3e 100644 --- a/orchestrator/tasks.py +++ b/orchestrator/tasks.py @@ -239,7 +239,7 @@ def check_job_status(job): Status.UNKNOWN, ): job.update_status(lsf_status) - + if lsf_status in (Status.RUNNING,): command_processor.delay(Command(CommandType.CHECK_HANGING, str(job.id)).to_dict()) diff --git a/tests/test_commandline.py b/tests/test_commandline.py index b3a9f2c2..db6ea3e7 100644 --- a/tests/test_commandline.py +++ b/tests/test_commandline.py @@ -233,7 +233,7 @@ def test_hanging_message_for_toil_leader_running(self): self.assertIsNotNone(self.job.message["alerts"][0]) self.assertTrue(MOCK_LOG_PATH in self.job.message["alerts"][0]["message"]) - def test_hanging_message_for_toil_leader_running(self): + def test_hanging_message_for_tool_running(self): """ Test alert sent of a hanging tool while its running """ From c0a30ba961d1cc3753c8aaeed076734d59a68f61 Mon Sep 17 00:00:00 2001 From: Nikhil Kumar Date: Fri, 24 Nov 2023 19:03:02 -0500 Subject: [PATCH 63/64] Fix non-deterministic test --- tests/test_commandline.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_commandline.py b/tests/test_commandline.py index db6ea3e7..9a1ab6d0 100644 --- a/tests/test_commandline.py +++ b/tests/test_commandline.py @@ -238,6 +238,9 @@ def test_hanging_message_for_tool_running(self): Test alert sent of a hanging tool while its running """ self.mock_track("running") + for single_job in CommandLineToolJob.objects.all(): + single_job.status = Status.COMPLETED + single_job.save() first_command = CommandLineToolJob.objects.first() first_command.status = Status.RUNNING first_command.save() From 4b99b8aacdfb1f4d85f223c4c37fb5cc9604c108 Mon Sep 17 00:00:00 2001 From: D-Pankey <30415217+D-Pankey@users.noreply.github.com> Date: Wed, 7 Feb 2024 15:41:26 -0500 Subject: [PATCH 64/64] Create github-actions.yml --- .github/workflows/github-actions.yml | 68 ++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 .github/workflows/github-actions.yml diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml new file mode 100644 index 00000000..0d9ebd03 --- /dev/null +++ b/.github/workflows/github-actions.yml @@ -0,0 +1,68 @@ +on: + push: + branches: [ "release/1.30.0" ] + +jobs: + build: + + runs-on: ubuntu-latest + + services: + postgres: + image: postgres:latest + env: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: github_actions + ports: + - 5432:5432 + # needed because the postgres container does not provide a healthcheck + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v4 + with: + python-version: 3.9 + - name: Install dependencies + run: | + pip install pip==19.3.1 + pip install --force-reinstall 'setuptools<58.0.0' + pip install -r requirements.txt + pip install -r requirements-toil.txt + #python manage.py migrate + - name: Run migrations + run: python manage.py migrate + - name: Run test + #run: python manage.py test + run: | + coverage run --source='.' manage.py test + coverage report -m --fail-under=75 + - name: Run flake8 + uses: suo/flake8-github-action@releases/v1 + with: + checkName: 'build' # NOTE: this needs to be the same as the job name + env: + RIDGEBACK_DB_NAME: github_actions + RIDGEBACK_DB_PASSWORD: postgres + RIDGEBACK_DB_USERNAME: postgres + RIDGEBACK_TOIL_JOB_STORE_ROOT: /sample_path + RIDGEBACK_TOIL_WORK_DIR_ROOT: /sample_path + RIDGEBACK_TOIL_TMP_DIR_ROOT: /sample_path + RIDGEBACK_LSF_WALLTIME: 10:00 + RIDGEBACK_LSF_SLA: SLA + CELERY_LOG_PATH: /sample_path + CELERY_PID_PATH: /sample_path + CELERY_BEAT_SCHEDULE_PATH: /sample_path + RIDGEBACK_NEXTFLOW_JOB_STORE_ROOT: /sample_path + RIDGEBACK_NEXTFLOW_WORK_DIR_ROOT: /sample_path + RIDGEBACK_NEXTFLOW_TMP_DIR_ROOT: /sample_path + SINGULARITY_PATH: /sample_singularity + RIDGEBACK_VENV: /sample_path + RIDGEBACK_PATH: /sample_path + RIDGEBACK_PORT: 4009 + RIDGEBACK_DEFAULT_QUEUE: sample_queue + RIDGEBACK_RABBITMQ_USERNAME: sample_username + RIDGEBACK_RABBITMQ_PASSWORD: sample_password