From 7b74255ece1cdf54bc1e9a5356f4146e8164e9a9 Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Mon, 21 Oct 2024 13:01:51 -0400 Subject: [PATCH 1/5] adding support for nfcore input/outputs --- orchestrator/models.py | 0 submitter/app/app.py | 6 +- .../nextflow_jobsubmitter.py | 86 ++++++++++++------- 3 files changed, 59 insertions(+), 33 deletions(-) mode change 100644 => 100755 orchestrator/models.py mode change 100644 => 100755 submitter/app/app.py mode change 100644 => 100755 submitter/nextflow_submitter/nextflow_jobsubmitter.py diff --git a/orchestrator/models.py b/orchestrator/models.py old mode 100644 new mode 100755 diff --git a/submitter/app/app.py b/submitter/app/app.py old mode 100644 new mode 100755 index 389e4c9c..3777f6d4 --- a/submitter/app/app.py +++ b/submitter/app/app.py @@ -11,7 +11,8 @@ def factory(app): repo = app["github"]["repository"] entrypoint = app["github"]["entrypoint"] version = app["github"].get("version", "master") - return GithubApp(repo, entrypoint, version) + nfcore_template = app["github"]["nfcore_template"] + return GithubApp(repo, entrypoint, nfcore_template, version) elif app.get("base64"): raise Exception("Base64 app not implemented yet") elif app.get("app"): @@ -32,11 +33,12 @@ class GithubApp(App): type = "github" logger = logging.getLogger(__name__) - def __init__(self, github, entrypoint, version="master"): + def __init__(self, github, entrypoint, nfcore_template, version="master"): super().__init__() self.github = github self.entrypoint = entrypoint self.version = version + self.nfcore_template = nfcore_template def resolve(self, location): dirname = os.path.join(location, self._extract_dirname_from_github_link()) diff --git a/submitter/nextflow_submitter/nextflow_jobsubmitter.py b/submitter/nextflow_submitter/nextflow_jobsubmitter.py old mode 100644 new mode 100755 index 1c5e3c90..bc567962 --- a/submitter/nextflow_submitter/nextflow_jobsubmitter.py +++ b/submitter/nextflow_submitter/nextflow_jobsubmitter.py @@ -1,6 +1,7 @@ import os import shutil import hashlib +import json from django.conf import settings from submitter import JobSubmitter @@ -47,6 +48,10 @@ def __init__( JobSubmitter.__init__(self, job_id, app, inputs, walltime, tool_walltime, memlimit, log_dir, app_name) self.resume_jobstore = resume_jobstore dir_config = settings.PIPELINE_CONFIG.get(self.app_name) + if self.app.nfcore_template: + self.cli_output_name = "--outdir" + else: + self.cli_output_name = "--outDir" if not dir_config: dir_config = settings.PIPELINE_CONFIG["NA"] if resume_jobstore: @@ -57,6 +62,7 @@ def __init__( self.job_outputs_dir = root_dir self.job_tmp_dir = os.path.join(dir_config["TMP_DIR_ROOT"], self.job_id) + def submit(self): self._prepare_directories() command_line = self._command_line() @@ -114,34 +120,53 @@ def _size(self, path): except Exception: return 0 + def _output_construct(self, path): + location = self._location(path) + basename = self._basename(path) + checksum = self._checksum(path) + size = self._size(path) + nameroot = self._nameroot(path) + nameext = self._nameext(path) + file_obj = { + "location": location, + "basename": basename, + "checksum": checksum, + "size": size, + "nameroot": nameroot, + "nameext": nameext, + "class": "File", + } + return file_obj + def get_outputs(self): - result = list() error_message = None - try: - with open(self.inputs["outputs"]) as f: - files = f.readlines() - for f in files: - path = f.strip() - location = self._location(path) - basename = self._basename(path) - checksum = self._checksum(path) - size = self._size(path) - nameroot = self._nameroot(path) - nameext = self._nameext(path) - file_obj = { - "location": location, - "basename": basename, - "checksum": checksum, - "size": size, - "nameroot": nameroot, - "nameext": nameext, - "class": "File", - } - result.append(file_obj) - except FileNotFoundError: - error_message = "Could not find %s" % self.inputs["outputs"] - except Exception: - error_message = "Could not parse %s" % self.inputs["outputs"] + result = list() + if self.app.nfcore_template: + prov_file = os.path.join(self.job_outputs_dir, "manifest.json") + try: + with open(prov_file, "r") as f: + prov_json = json.loads(f.read()) + published_json = prov_json["published"] + for f in published_json: + path = f["target"] + file_obj = self._output_construct(path) + result.append(file_obj) + except (IndexError, ValueError): + error_message = "Could not parse json from %s" % prov_file + except FileNotFoundError: + error_message = "Could not find %s" % prov_file + else: + try: + with open(self.inputs["outputs"]) as f: + files = f.readlines() + for f in files: + path = f.strip() + file_obj = self._output_construct(path) + result.append(file_obj) + except FileNotFoundError: + error_message = "Could not find %s" % self.inputs["outputs"] + except Exception: + error_message = "Could not parse %s" % self.inputs["outputs"] result_json = {"outputs": result} return result_json, error_message @@ -154,8 +179,8 @@ def _dump_app_inputs(self): params = self.inputs.get("params", []) for i in inputs: input_map[i["name"]] = self._dump_input(i["name"], i["content"], self.job_work_dir) - if self.log_dir: - input_map[i["name"]] = self._dump_input(i["name"], i["content"], self.log_dir) + # if self.log_dir: + # input_map[i["name"]] = self._dump_input(i["name"], i["content"], self.log_dir) config = self.inputs.get("config") if config: config_path = self._dump_config(config) @@ -193,7 +218,6 @@ def _prepare_directories(self): def _command_line(self): app_location, input_map, config, profile, params = self._dump_app_inputs() - command_line = [ settings.NEXTFLOW, "-log", @@ -203,8 +227,8 @@ def _command_line(self): "-profile", profile, "-w", - self.job_store_dir, - "--outDir", + self.job_work_dir, + self.cli_output_name, self.job_outputs_dir, ] for k, v in input_map.items(): From 11a758e7690ea74f110621909f15ad5524ec440a Mon Sep 17 00:00:00 2001 From: buehlere Date: Mon, 21 Oct 2024 14:20:17 -0400 Subject: [PATCH 2/5] adding empty nfcore case --- submitter/app/app.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/submitter/app/app.py b/submitter/app/app.py index 0ff74a10..8822bc5e 100755 --- a/submitter/app/app.py +++ b/submitter/app/app.py @@ -11,7 +11,10 @@ def factory(app): repo = app["github"]["repository"] entrypoint = app["github"]["entrypoint"] version = app["github"].get("version", "master") - nfcore_template = app["github"]["nfcore_template"] + if app["github"].get("nfcore_template"): + nfcore_template = app["github"]["nfcore_template"] + else: + nfcore_template = None return GithubApp(repo, entrypoint, nfcore_template, version) elif app.get("base64"): raise Exception("Base64 app not implemented yet") From 64f4d4be08a8db5dc79bf4c10b3b61f33ef532ce Mon Sep 17 00:00:00 2001 From: buehlere Date: Mon, 21 Oct 2024 14:20:24 -0400 Subject: [PATCH 3/5] linting --- submitter/nextflow_submitter/nextflow_jobsubmitter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/submitter/nextflow_submitter/nextflow_jobsubmitter.py b/submitter/nextflow_submitter/nextflow_jobsubmitter.py index 7965e176..da21e57a 100755 --- a/submitter/nextflow_submitter/nextflow_jobsubmitter.py +++ b/submitter/nextflow_submitter/nextflow_jobsubmitter.py @@ -145,7 +145,7 @@ def get_outputs(self): result = list() if self.app.nfcore_template: prov_file = os.path.join(self.job_outputs_dir, "manifest.json") - try: + try: with open(prov_file, "r") as f: prov_json = json.loads(f.read()) published_json = prov_json["published"] @@ -157,7 +157,7 @@ def get_outputs(self): error_message = "Could not parse json from %s" % prov_file except FileNotFoundError: error_message = "Could not find %s" % prov_file - else: + else: try: with open(self.inputs["outputs"]) as f: files = f.readlines() From e8468a02b34062498f19de5bd1d6af1b1f0b42e7 Mon Sep 17 00:00:00 2001 From: buehlere Date: Mon, 21 Oct 2024 14:33:20 -0400 Subject: [PATCH 4/5] Update app.py --- submitter/app/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submitter/app/app.py b/submitter/app/app.py index 8822bc5e..0c73389f 100755 --- a/submitter/app/app.py +++ b/submitter/app/app.py @@ -13,7 +13,7 @@ def factory(app): version = app["github"].get("version", "master") if app["github"].get("nfcore_template"): nfcore_template = app["github"]["nfcore_template"] - else: + else: nfcore_template = None return GithubApp(repo, entrypoint, nfcore_template, version) elif app.get("base64"): From 8071d181ec152ce0bf863170b2fdf33c944e3bab Mon Sep 17 00:00:00 2001 From: voyager pipeline user Date: Tue, 22 Oct 2024 16:25:12 -0400 Subject: [PATCH 5/5] simplifying output logic --- submitter/nextflow_submitter/nextflow_jobsubmitter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/submitter/nextflow_submitter/nextflow_jobsubmitter.py b/submitter/nextflow_submitter/nextflow_jobsubmitter.py index da21e57a..749d5813 100755 --- a/submitter/nextflow_submitter/nextflow_jobsubmitter.py +++ b/submitter/nextflow_submitter/nextflow_jobsubmitter.py @@ -143,8 +143,8 @@ def _output_construct(self, path): def get_outputs(self): error_message = None result = list() - if self.app.nfcore_template: - prov_file = os.path.join(self.job_outputs_dir, "manifest.json") + prov_file = os.path.join(self.job_outputs_dir, "manifest.json") + if os.path.exists(prov_file): try: with open(prov_file, "r") as f: prov_json = json.loads(f.read())