Skip to content

Commit

Permalink
Merge pull request #87 from mskcc/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
nikhil authored Jun 17, 2020
2 parents e97846b + 942a36a commit 2a11bae
Show file tree
Hide file tree
Showing 23 changed files with 510 additions and 51 deletions.
29 changes: 29 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
language: python
sudo: required
os:
- linux
services:
- postgresql
addons:
postgresql: "11"
apt:
packages:
- postgresql-11
- postgresql-client-11
python:
- "3.7.3"
before_install:
- sudo apt-get update
- sudo apt-get --yes remove postgresql\*
- sudo apt-get install -y postgresql-11 postgresql-client-11
- sudo cp /etc/postgresql/{9.6,11}/main/pg_hba.conf
- sudo service postgresql restart 11
install:
- psql -p 5433 -c 'create database travis_ci_test;' -U postgres
- pip install -r requirements.txt
- pip install toil
- source travis_env.sh
- python manage.py migrate
script:
- source travis_env.sh
- python manage.py test --verbosity=2
14 changes: 9 additions & 5 deletions batch_systems/lsf_client/lsf_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ def _handle_status(self, process_status, process_output, external_job_id):
"Job [%s] completed", external_job_id)
return (Status.COMPLETED, None)
if process_status == 'PEND':
pending_info = None
pending_info = ""
if 'PEND_REASON' in process_output:
if process_output['PEND_REASON']:
pending_info = process_output['PEND_REASON']
self.logger.debug("Job [%s] pending with: %s", external_job_id, pending_info)
return (Status.PENDING, pending_info.strip())
if process_status == 'EXIT':
exit_code = 1
exit_info = None
exit_info = ""
if 'EXIT_CODE' in process_output:
if process_output['EXIT_CODE']:
exit_code = process_output['EXIT_CODE']
Expand All @@ -150,7 +150,7 @@ def _handle_status(self, process_status, process_output, external_job_id):
self.logger.debug(
"Job [%s] is in an unhandled state (%s)", external_job_id, process_status)
status_info = "Job is in an unhandles state: {}".format(process_status)
return (Status.UNKOWN, status_info.strip())
return (Status.UNKNOWN, status_info.strip())


def _parse_status(self, stdout, external_job_id):
Expand All @@ -164,14 +164,18 @@ def _parse_status(self, stdout, external_job_id):
tuple: (Ridgeback Status int, extra info)
"""
bjobs_records = self.parse_bjobs(stdout)
status = None
if bjobs_records:
process_output = bjobs_records[0]
if 'STAT' in process_output:
process_status = process_output['STAT']
return self._handle_status(process_status, process_output, external_job_id)
if 'ERROR' in process_output:
error_message = ""
if process_output['ERROR']:
error_message = process_output['ERROR']
return (Status.UNKNOWN, error_message.strip())

return status
return None

def status(self, external_job_id):
"""Parse LSF status
Expand Down
3 changes: 2 additions & 1 deletion container/ridgeback_service.def
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ Includecmd: no
&& git clone https://github.com/mskcc/ridgeback --branch ${RIDGEBACK_BRANCH}
cd /usr/bin/ridgeback \
&& python3 -m pip install python-ldap \
&& pip3 install -r requirements.txt
&& pip3 install toil==3.21.0 \
&& pip3 install -r requirements.txt
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
Django==2.2.10
Django==2.2.13
psycopg2==2.7.4
djangorestframework==3.9.4
markdown==3.1.1
django-filter==2.1.0
django-cors-headers==3.0.2
python-slugify==3.0.2
django-rest-swagger==2.2.0
drf-yasg==1.17.1
GitPython==3.0.8
celery==4.3.0
cwltool==2.0.20200122124526
schema-salad==5.0.20200122085940
psycopg2-binary==2.8.4
mock==4.0.2
1 change: 1 addition & 0 deletions ridgeback/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__="1.2.0"
11 changes: 9 additions & 2 deletions ridgeback/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

ALLOWED_HOSTS = ['silo', 'localhost']
ALLOWED_HOSTS = os.environ.get('RIDGEBACK_ALLOWED_HOSTS', 'localhost').split(',')


# Application definition
Expand All @@ -39,7 +39,7 @@
'django.contrib.staticfiles',
'toil_orchestrator.apps.ToilOrchestratorConfig',
'rest_framework',
'rest_framework_swagger'
'drf_yasg'
]

MIDDLEWARE = [
Expand Down Expand Up @@ -144,6 +144,13 @@
# https://docs.djangoproject.com/en/2.2/howto/static-files/

STATIC_URL = '/static/'
LOGIN_URL='/admin/login/'
LOGOUT_URL='/admin/logout/'

SWAGGER_SETTINGS = {
'VALIDATOR_URL':None
}


CORS_ORIGIN_ALLOW_ALL = True

Expand Down
18 changes: 15 additions & 3 deletions ridgeback/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,25 @@
from django.contrib import admin
from django.conf.urls import url
from django.urls import path, include
from rest_framework_swagger.views import get_swagger_view
from drf_yasg.views import get_schema_view
from drf_yasg import openapi
from rest_framework import permissions
from ridgeback import __version__


schema_view = get_swagger_view(title='Ridgeback API')

schema_view = get_schema_view(
openapi.Info(
title="Ridgeback API",
default_version=__version__
),
public=True,
permission_classes=(permissions.AllowAny,),
)

urlpatterns = [
url(r'^$', schema_view),
url(r'^$', schema_view.with_ui('swagger', cache_timeout=0), name='schema-swagger-ui'),
url(r'^swagger(?P<format>\.json|\.yaml)$', schema_view.without_ui(cache_timeout=0), name='schema-json'),
path('admin/', admin.site.urls),
path('v0/', include('toil_orchestrator.urls')),
]
46 changes: 32 additions & 14 deletions submitter/jobsubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ def _extract_dirname_from_github_link(self):

class JobSubmitter(object):

def __init__(self, job_id, app, inputs, root_dir):
def __init__(self, job_id, app, inputs, root_dir, resume_jobstore):
self.job_id = job_id
self.app = App.factory(app)
self.inputs = inputs
self.lsf_client = LSFClient()
self.job_store_dir = os.path.join(settings.TOIL_JOB_STORE_ROOT, self.job_id)
self.resume_jobstore = resume_jobstore
if resume_jobstore:
self.job_store_dir = resume_jobstore
else:
self.job_store_dir = os.path.join(settings.TOIL_JOB_STORE_ROOT, self.job_id)
self.job_work_dir = os.path.join(settings.TOIL_WORK_DIR_ROOT, self.job_id)
self.job_outputs_dir = root_dir
self.job_tmp_dir = os.path.join(settings.TOIL_TMP_DIR_ROOT, self.job_id)
Expand All @@ -70,13 +74,22 @@ def status(self, external_id):
return self.lsf_client.status(external_id)

def get_outputs(self):
with open(os.path.join(self.job_work_dir, 'lsf.log'), 'r') as f:
data = f.readlines()
data = ''.join(data)
substring = data.split('\n{')[1]
result = ('{' + substring).split('-----------')[0]
result_json = json.loads(result)
return result_json
error_message = None
result_json = None
lsf_log_path = os.path.join(self.job_work_dir, 'lsf.log')
try:
with open(lsf_log_path, 'r') as f:
data = f.readlines()
data = ''.join(data)
substring = data.split('\n{')[1]
result = ('{' + substring).split('-----------')[0]
result_json = json.loads(result)
except (IndexError, ValueError):
error_message = 'Could not parse json from %s' % lsf_log_path
except FileNotFoundError:
error_message = 'Could not find %s' % lsf_log_path

return result_json, error_message

def _dump_app_inputs(self):
app_location = self.app.resolve(self.job_work_dir)
Expand All @@ -89,17 +102,22 @@ def _prepare_directories(self):
if not os.path.exists(self.job_work_dir):
os.mkdir(self.job_work_dir)

if os.path.exists(self.job_store_dir):
# delete job-store directory for now so that execution can work;
# TODO: Implement resume at a later time
if os.path.exists(self.job_store_dir) and not self.resume_jobstore:
shutil.rmtree(self.job_store_dir)
os.mkdir(self.job_store_dir)

if self.resume_jobstore:
if not os.path.exists(self.resume_jobstore):
raise Exception('The jobstore indicated to be resumed could not be found')

if not os.path.exists(self.job_tmp_dir):
os.mkdir(self.job_tmp_dir)

def _command_line(self):
command_line = [settings.CWLTOIL, '--singularity', '--logFile', 'toil_log.log', '--batchSystem','lsf','--disable-user-provenance','--disable-host-provenance','--stats', '--debug', '--disableCaching', '--preserve-environment', 'PATH', 'TMPDIR', 'TOIL_LSF_ARGS', 'SINGULARITY_PULLDIR', 'SINGULARITY_CACHEDIR', 'PWD', '--defaultMemory', '8G', '--maxCores', '16', '--maxDisk', '128G', '--maxMemory', '256G', '--not-strict', '--realTimeLogging', '--jobStore', self.job_store_dir, '--tmpdir-prefix', self.job_tmp_dir, '--workDir', self.job_work_dir, '--outdir', self.job_outputs_dir, '--maxLocalJobs', '500']
command_line.extend(self._dump_app_inputs())
app_location, inputs_location = self._dump_app_inputs()
if self.resume_jobstore:
command_line.extend(['--restart',app_location])
else:
command_line.extend([app_location, inputs_location])
return command_line

Empty file added tests/__init__.py
Empty file.
98 changes: 98 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from mock import patch
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APITestCase
from toil_orchestrator.models import Job, Status
from django.contrib.auth.models import User
from django.utils.timezone import now


class JobTestCase(APITestCase):

def setUp(self):
example_app = {'github': {'repository':'example_repository','entrypoint':'example_entrypoint'}}
self.example_job = Job.objects.create(
app=example_app,
root_dir='example_rootdir',
id='7aacda86-b12f-4068-b2e3-a96552430a0f',
job_store_location='/example_job_store')
self.api_root = reverse('api-root')

def test_list(self):
url = self.api_root + 'jobs/'
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.json()['count'], 1)
self.assertEqual(response.json()['results'][0]['id'], self.example_job.id)

def test_read(self):
url = '{}jobs/{}/'.format(self.api_root,self.example_job.id)
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.json()['id'], self.example_job.id)

def test_404_read(self):
url = '{}jobs/{}/'.format(self.api_root,self.example_job.id[::-1])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

@patch('toil_orchestrator.tasks.submit_jobs_to_lsf.delay')
def test_create(self, submit_jobs_mock):
url = self.api_root + 'jobs/'
submit_jobs_mock.return_value = None
data = {
'app': self.example_job.app,
'root_dir': self.example_job.root_dir,
'inputs': {'example_input': True}
}
response = self.client.post(url, data=data, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)

def test_create_empty(self):
url = self.api_root + 'jobs/'
data = {}
response = self.client.post(url, data=data, format='json')
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

def test_delete_unauthorized(self):
url = '{}jobs/{}/'.format(self.api_root,self.example_job.id)
response = self.client.delete(url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

def test_delete_authorized(self):
url = '{}jobs/{}/'.format(self.api_root,self.example_job.id)
admin_user = User.objects.create_superuser('admin', 'sample_email', 'password')
self.client.force_authenticate(user=admin_user)
response = self.client.delete(url)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)

@patch('toil_orchestrator.tasks.submit_jobs_to_lsf.delay')
def test_resume(self, submit_jobs_mock):
url = '{}jobs/{}/resume/'.format(self.api_root, self.example_job.id)
submit_jobs_mock.return_value = None
data = {
'root_dir': self.example_job.root_dir
}
response = self.client.post(url, data=data, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(response.json()['resume_job_store_location'], self.example_job.job_store_location)

def test_resume_job_missing(self):
url = '{}jobs/{}/resume/'.format(self.api_root,self.example_job.id[::-1])
data = {
'root_dir': self.example_job.root_dir
}
response = self.client.post(url, data=data, format='json')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)

def test_resume_jobstore_cleaned_up(self):
current_job = Job.objects.get(id=self.example_job.id)
current_job.job_store_clean_up = now()
current_job.save()
url = '{}jobs/{}/resume/'.format(self.api_root,self.example_job.id)
data = {
'root_dir': self.example_job.root_dir
}
response = self.client.post(url, data=data, format='json')
self.assertEqual(response.status_code, status.HTTP_410_GONE)

17 changes: 15 additions & 2 deletions toil_orchestrator/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,32 @@ class JobAdmin(admin.ModelAdmin):

def cleanup_files(self, request, queryset):
cleaned_up_projects = 0
partially_cleaned_up_projects = 0
already_cleaned_up_projects = 0
cleaned_up_message = None
for single_query in queryset:
jobstore_location = single_query.job_store_location
working_dir = single_query.working_dir
jobstore_cleaned_up = False
workdir_dir_cleaned_up = False
if not single_query.job_store_clean_up:
cleanup_folder.delay(str(jobstore_location), single_query.id, True)
jobstore_cleaned_up = True
if not single_query.working_dir_clean_up:
cleanup_folder.delay(str(working_dir), single_query.id, False)
workdir_dir_cleaned_up = True
if jobstore_cleaned_up and workdir_dir_cleaned_up:
cleaned_up_projects = cleaned_up_projects + 1
else:
elif not jobstore_cleaned_up and not workdir_dir_cleaned_up:
already_cleaned_up_projects = already_cleaned_up_projects + 1
else:
partially_cleaned_up_projects = partially_cleaned_up_projects + 1
cleaned_up_projects = cleaned_up_projects + 1
if cleaned_up_projects > 0:
cleaned_up_message = "Cleaned up %s job(s)" % cleaned_up_projects
if partially_cleaned_up_projects > 0:
cleaned_up_message = "Cleaning up %s job(s) [ %s partial ]" % (cleaned_up_projects, partially_cleaned_up_projects)
else:
cleaned_up_message = "Cleaning up %s job(s)" % cleaned_up_projects
level = messages.SUCCESS
if already_cleaned_up_projects > 0:
if cleaned_up_message != None:
Expand Down
Loading

0 comments on commit 2a11bae

Please sign in to comment.