Skip to content

Commit

Permalink
Handle empty group 2 for WDL CWL and Nextflow workflows (#436)
Browse files Browse the repository at this point in the history
* Use CWL version 1.2
* Add a when condition so Cavatica doesn't get stuck waiting
* Preserve the order of input files for the post step in Nextflow
  • Loading branch information
EricKutschera authored Sep 18, 2024
1 parent 023b528 commit fb6e269
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 93 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,3 @@ In rMATS-turbo, each alternative splicing pattern has a corresponding set of out
- `[datetime]_[id].rmats`: Summary generated from processing a BAM file
- `[datetime]_bam[sample_num]_[replicate_num]/Aligned.sortedByCoord.out.bam`: Result of mapping input FASTQ files
- `[datetime]_read_outcomes_by_bam.txt`: Counts of the reads used from each BAM file along with counts of the reasons that reads could not be used

10 changes: 10 additions & 0 deletions cwl/pack_cwl_for_sbg.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ def parse_args():
return parser.parse_args()


# Rcwl doesn't seem to support []. Instead [null] is converted to [] here.
def replace_singleton_null_default(workflow):
inputs = workflow['inputs']
for value in inputs.values():
default = value.get('default')
if default == [None]:
value['default'] = list()


def convert_steps_to_list(steps):
steps_list = list()
for id_key, step_dict in steps.items():
Expand Down Expand Up @@ -107,6 +116,7 @@ def adapt_for_sbg(abs_cwl_path):

replace_steps(loaded)
replace_value_from(loaded)
replace_singleton_null_default(loaded)
inline_other_cwl_files(loaded, base_path)
return loaded

Expand Down
64 changes: 43 additions & 21 deletions cwl/pl_rMATS_bam.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
rmats_docker <- "xinglab/rmats:v4.3.0"
cwl_version <- "v1.2"

get_array_type <- function(item_type) {
return(list(type = "array", items = item_type))
Expand All @@ -9,7 +10,9 @@ get_2d_array_type <- function(item_type) {

## Workflow inputs
wf_bam_g1_input <- InputParam(id = "wf_bam_g1", type = "File[]", position = -1)
wf_bam_g2_input <- InputParam(id = "wf_bam_g2", type = "File[]", position = -1)
## Rcwl doesn't seem to support []. Convert [null] to [] later
wf_bam_g2_input <- InputParam(id = "wf_bam_g2", type = "File[]?", default = list(NULL),
position = -1)
wf_gtf_input <- InputParam(id = "wf_gtf", type = "File", position = -1)
wf_is_single_end_input <- InputParam(id = "wf_is_single_end", type = "boolean?", default = FALSE,
position = -1)
Expand Down Expand Up @@ -90,19 +93,20 @@ prep_read_outcome_output <- OutputParam(id = "prep_read_outcome", type = "File",
glob = "$('prep_' + inputs.prep_bam_id + '_read_outcomes_by_bam.txt')")
prep_bam_name_output <- OutputParam(id = "prep_bam_name", type = "string",
outputEval = "$(inputs.prep_bam.path.split('/').pop())")
rmats_prep <- cwlProcess(baseCommand = "bash script.sh",
requirements = list(prep_docker_req, prep_js_req, prep_init_work_dir_req,
prep_resource_req),
inputs = InputParamList(prep_bam_input, prep_bam_id_input, prep_gtf_input,
prep_is_single_end_input, prep_readLength_input,
prep_out_dir_input, prep_lib_type_input,
prep_variable_read_length_input,
prep_anchorLength_input, prep_novelSS_input,
prep_mil_input, prep_mel_input,
prep_allow_clipping_input, prep_machine_mem_gb_input,
prep_disk_space_gb_input),
outputs = OutputParamList(prep_out_rmats_output, prep_read_outcome_output,
prep_bam_name_output))
rmats_prep <- cwlProcess(cwlVersion = cwl_version,
baseCommand = "bash script.sh",
requirements = list(prep_docker_req, prep_js_req, prep_init_work_dir_req,
prep_resource_req),
inputs = InputParamList(prep_bam_input, prep_bam_id_input, prep_gtf_input,
prep_is_single_end_input, prep_readLength_input,
prep_out_dir_input, prep_lib_type_input,
prep_variable_read_length_input,
prep_anchorLength_input, prep_novelSS_input,
prep_mil_input, prep_mel_input,
prep_allow_clipping_input, prep_machine_mem_gb_input,
prep_disk_space_gb_input),
outputs = OutputParamList(prep_out_rmats_output, prep_read_outcome_output,
prep_bam_name_output))

## Expression tool steps to convert Files to locations.
## This avoids loading all the Files to the disk of a single worker machine.
Expand All @@ -112,7 +116,8 @@ exp_file_to_loc_file_input <- InputParam(id = "exp_file_to_loc_file", type = "Fi
exp_file_to_loc_js_req <- requireJS()
exp_file_to_loc_js <- "${return({'exp_file_to_loc_loc': inputs.exp_file_to_loc_file.location})}"
exp_file_to_loc_loc_output <- OutputParam(id = "exp_file_to_loc_loc", type = "string")
exp_file_to_loc <- cwlProcess(cwlClass = "ExpressionTool",
exp_file_to_loc <- cwlProcess(cwlVersion = cwl_version,
cwlClass = "ExpressionTool",
requirements = list(exp_file_to_loc_js_req),
inputs = InputParamList(exp_file_to_loc_file_input),
outputs = OutputParamList(exp_file_to_loc_loc_output),
Expand Down Expand Up @@ -142,7 +147,8 @@ exp_bam_id_ids_js <- paste(sep = "\n",
"}",
"return({'exp_bam_id_ids': id_strings})}")
exp_bam_id_ids_output <- OutputParam(id = "exp_bam_id_ids", type = "string[]")
exp_bam_id <- cwlProcess(cwlClass = "ExpressionTool",
exp_bam_id <- cwlProcess(cwlVersion = cwl_version,
cwlClass = "ExpressionTool",
requirements = list(exp_bam_id_js_req),
inputs = InputParamList(exp_bam_id_bams_input, exp_bam_id_prefix_input),
outputs = OutputParamList(exp_bam_id_ids_output),
Expand All @@ -151,9 +157,20 @@ exp_bam_id <- cwlProcess(cwlClass = "ExpressionTool",
step_exp_prep_g1 <- cwlStep(id = "step_exp_prep_g1", run = exp_bam_id,
In = list(exp_bam_id_bams = "step_exp_file_to_loc_g1/exp_file_to_loc_loc",
exp_bam_id_prefix = list(valueFrom = "g1_")))

## The 'when' expression will cause this step to be skipped if
## there are no group 2 bams.
## Cavatica would otherwise hang waiting for this step to complete.
## According to the CWL specification it seems like it should be fine to
## run this anyway and get an empty list.
## If it is skipped then the output will be null.
## That should not be an issue for step_prep_g2 since it scatters on
## "prep_bam" and "prep_bam_id" and if any scatter param evaluates to []
## then the step is skipped and produces [] as output.
step_exp_prep_g2 <- cwlStep(id = "step_exp_prep_g2", run = exp_bam_id,
In = list(exp_bam_id_bams = "step_exp_file_to_loc_g2/exp_file_to_loc_loc",
exp_bam_id_prefix = list(valueFrom = "g2_")))
exp_bam_id_prefix = list(valueFrom = "g2_")),
when = "$(inputs.exp_bam_id_bams.length > 0)")

## Prep step scatter over bam_g1
step_prep_g1 <- cwlStep(id = "step_prep_g1",
Expand Down Expand Up @@ -225,6 +242,9 @@ post_disk_space_gb_input <- InputParam(id = "post_disk_space_gb", type = "int",

post_script_string <- paste(sep = "\n",
"${",
"var has_g2 = inputs.post_bam_name_g2.length > 0",
"var b2_opt = has_g2 ? '--b2' : ''",
"var b2_val = has_g2 ? 'bam_g2.txt' : ''",
"var anchorLength_opt = inputs.post_anchorLength != null ? '--anchorLength' : ''",
"var anchorLength_string = inputs.post_anchorLength != null ? inputs.post_anchorLength : ''",
"var is_default_stats = (!inputs.post_paired_stats) && (!inputs.post_darts_model)",
Expand Down Expand Up @@ -287,7 +307,7 @@ post_script_string <- paste(sep = "\n",
" }",
"}",
"script += ' > bam_g2.txt\\n'",
"script += 'python /rmats/rmats.py --b1 bam_g1.txt --b2 bam_g2.txt --gtf ' + inputs.post_gtf.path + ' --readLength ' + inputs.post_readLength + ' --nthread ' + inputs.post_nthread + ' --od ' + inputs.post_out_dir + ' --tmp fd_rmats --task post ' + anchorLength_opt + ' ' + anchorLength_string + ' --tstat ' + inputs.post_tstat + ' ' + cstat_opt + ' ' + cstat_val + ' ' + statoff_opt + ' ' + paired_stats_opt + ' ' + darts_model_opt + ' ' + darts_cutoff_opt + ' ' + darts_cutoff_val + ' ' + novelSS_opt + ' ' + mil_opt + ' ' + mil_val + ' ' + mel_opt + ' ' + mel_val + ' ' + individual_counts_opt + '\\n'",
"script += 'python /rmats/rmats.py --b1 bam_g1.txt ' + b2_opt + ' ' + b2_val + ' --gtf ' + inputs.post_gtf.path + ' --readLength ' + inputs.post_readLength + ' --nthread ' + inputs.post_nthread + ' --od ' + inputs.post_out_dir + ' --tmp fd_rmats --task post ' + anchorLength_opt + ' ' + anchorLength_string + ' --tstat ' + inputs.post_tstat + ' ' + cstat_opt + ' ' + cstat_val + ' ' + statoff_opt + ' ' + paired_stats_opt + ' ' + darts_model_opt + ' ' + darts_cutoff_opt + ' ' + darts_cutoff_val + ' ' + novelSS_opt + ' ' + mil_opt + ' ' + mil_val + ' ' + mel_opt + ' ' + mel_val + ' ' + individual_counts_opt + '\\n'",
"script += 'tar czf ' + inputs.post_out_dir + '.tar.gz ' + inputs.post_out_dir + '\\n'",
"return(script)}")

Expand All @@ -300,7 +320,8 @@ post_resource_req <- requireResource(coresMin = "$(inputs.post_nthread)",
outdirMin = "$(inputs.post_disk_space_gb * 1024)")
post_out_tar_output <- OutputParam(id = "post_out_tar", type = "File",
glob = "$(inputs.post_out_dir + '.tar.gz')")
rmats_post <- cwlProcess(baseCommand = "bash script.sh",
rmats_post <- cwlProcess(cwlVersion = cwl_version,
baseCommand = "bash script.sh",
requirements = list(post_docker_req, post_js_req, post_init_work_dir_req,
post_resource_req),
inputs = InputParamList(post_bam_name_g1_input, post_bam_name_g2_input,
Expand Down Expand Up @@ -360,7 +381,8 @@ exp_read_outcome_outcomes_js <- paste(sep = "\n",
"}",
"return({'exp_read_outcome_outcomes': outcomes})}")
exp_read_outcome_outcomes_output <- OutputParam(id = "exp_read_outcome_outcomes", type = "File[]")
exp_read_outcome <- cwlProcess(cwlClass = "ExpressionTool",
exp_read_outcome <- cwlProcess(cwlVersion = cwl_version,
cwlClass = "ExpressionTool",
requirements = list(exp_read_outcome_js_req),
inputs = InputParamList(exp_read_outcome_prep_g1_input,
exp_read_outcome_prep_g2_input),
Expand All @@ -378,7 +400,7 @@ wf_out_tar_output <- OutputParam(id = "wf_out_tar", type = "File",
outputSource = "step_post/post_out_tar")
wf_scatter_req <- requireScatter()
wf_step_input_exp_req <- requireStepInputExpression()
workflow <- cwlWorkflow(cwlVersion = "v1.0",
workflow <- cwlWorkflow(cwlVersion = cwl_version,
requirements = list(wf_scatter_req, wf_step_input_exp_req),
inputs = InputParamList(wf_bam_g1_input, wf_bam_g2_input, wf_gtf_input,
wf_is_single_end_input, wf_readLength_input,
Expand Down
49 changes: 27 additions & 22 deletions cwl/rMATS_bam_packed.cwl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class: Workflow
cwlVersion: v1.0
cwlVersion: v1.2
inputs:
wf_allow_clipping:
default: false
Expand All @@ -9,7 +9,8 @@ inputs:
wf_bam_g1:
type: File[]
wf_bam_g2:
type: File[]
default: []
type: File[]?
wf_cstat:
default: '0.0001'
type: string?
Expand Down Expand Up @@ -82,7 +83,7 @@ steps:
- exp_file_to_loc_loc
run:
class: ExpressionTool
cwlVersion: v1.0
cwlVersion: v1.2
expression: '${return({''exp_file_to_loc_loc'': inputs.exp_file_to_loc_file.location})}'
inputs:
exp_file_to_loc_file:
Expand All @@ -102,7 +103,7 @@ steps:
- exp_file_to_loc_loc
run:
class: ExpressionTool
cwlVersion: v1.0
cwlVersion: v1.2
expression: '${return({''exp_file_to_loc_loc'': inputs.exp_file_to_loc_file.location})}'
inputs:
exp_file_to_loc_file:
Expand All @@ -125,7 +126,7 @@ steps:
- exp_bam_id_ids
run:
class: ExpressionTool
cwlVersion: v1.0
cwlVersion: v1.2
expression: "${\nvar id_strings = new Array(inputs.exp_bam_id_bams.length)\nfor\
\ (var i = 0; i < id_strings.length; i++) {\n id_strings[i] = inputs.exp_bam_id_prefix\
\ + i\n}\nreturn({'exp_bam_id_ids': id_strings})}"
Expand All @@ -149,7 +150,7 @@ steps:
- exp_bam_id_ids
run:
class: ExpressionTool
cwlVersion: v1.0
cwlVersion: v1.2
expression: "${\nvar id_strings = new Array(inputs.exp_bam_id_bams.length)\nfor\
\ (var i = 0; i < id_strings.length; i++) {\n id_strings[i] = inputs.exp_bam_id_prefix\
\ + i\n}\nreturn({'exp_bam_id_ids': id_strings})}"
Expand All @@ -163,6 +164,7 @@ steps:
type: string[]
requirements:
- class: InlineJavascriptRequirement
when: $(inputs.exp_bam_id_bams.length > 0)
- id: step_prep_g1
in:
prep_allow_clipping: wf_allow_clipping
Expand All @@ -187,7 +189,7 @@ steps:
run:
baseCommand: bash script.sh
class: CommandLineTool
cwlVersion: v1.0
cwlVersion: v1.2
inputs:
prep_allow_clipping:
type: boolean
Expand Down Expand Up @@ -319,7 +321,7 @@ steps:
run:
baseCommand: bash script.sh
class: CommandLineTool
cwlVersion: v1.0
cwlVersion: v1.2
inputs:
prep_allow_clipping:
type: boolean
Expand Down Expand Up @@ -455,7 +457,7 @@ steps:
run:
baseCommand: bash script.sh
class: CommandLineTool
cwlVersion: v1.0
cwlVersion: v1.2
inputs:
post_anchorLength:
type: int?
Expand Down Expand Up @@ -518,9 +520,11 @@ steps:
- class: InlineJavascriptRequirement
- class: InitialWorkDirRequirement
listing:
- entry: "${\nvar anchorLength_opt = inputs.post_anchorLength != null ? '--anchorLength'\
\ : ''\nvar anchorLength_string = inputs.post_anchorLength != null ? inputs.post_anchorLength\
\ : ''\nvar is_default_stats = (!inputs.post_paired_stats) && (!inputs.post_darts_model)\n\
- entry: "${\nvar has_g2 = inputs.post_bam_name_g2.length > 0\nvar b2_opt =\
\ has_g2 ? '--b2' : ''\nvar b2_val = has_g2 ? 'bam_g2.txt' : ''\nvar anchorLength_opt\
\ = inputs.post_anchorLength != null ? '--anchorLength' : ''\nvar anchorLength_string\
\ = inputs.post_anchorLength != null ? inputs.post_anchorLength : ''\nvar\
\ is_default_stats = (!inputs.post_paired_stats) && (!inputs.post_darts_model)\n\
var cstat_opt = is_default_stats ? '--cstat' : ''\nvar cstat_val = is_default_stats\
\ ? inputs.post_cstat : ''\nvar statoff_opt = inputs.post_statoff ? '--statoff'\
\ : ''\nvar paired_stats_opt = inputs.post_paired_stats ? '--paired-stats'\
Expand Down Expand Up @@ -549,15 +553,16 @@ steps:
\ += 'echo '\nfor (var i = 0; i < inputs.post_bam_name_g2.length; i++) {\n\
\ script += inputs.post_bam_name_g2[i]\n if (i != (inputs.post_bam_name_g2.length\
\ - 1)) {\n script += ','\n }\n}\nscript += ' > bam_g2.txt\\n'\nscript\
\ += 'python /rmats/rmats.py --b1 bam_g1.txt --b2 bam_g2.txt --gtf ' + inputs.post_gtf.path\
\ + ' --readLength ' + inputs.post_readLength + ' --nthread ' + inputs.post_nthread\
\ + ' --od ' + inputs.post_out_dir + ' --tmp fd_rmats --task post ' + anchorLength_opt\
\ + ' ' + anchorLength_string + ' --tstat ' + inputs.post_tstat + ' ' +\
\ cstat_opt + ' ' + cstat_val + ' ' + statoff_opt + ' ' + paired_stats_opt\
\ + ' ' + darts_model_opt + ' ' + darts_cutoff_opt + ' ' + darts_cutoff_val\
\ + ' ' + novelSS_opt + ' ' + mil_opt + ' ' + mil_val + ' ' + mel_opt +\
\ ' ' + mel_val + ' ' + individual_counts_opt + '\\n'\nscript += 'tar czf\
\ ' + inputs.post_out_dir + '.tar.gz ' + inputs.post_out_dir + '\\n'\nreturn(script)}"
\ += 'python /rmats/rmats.py --b1 bam_g1.txt ' + b2_opt + ' ' + b2_val +\
\ ' --gtf ' + inputs.post_gtf.path + ' --readLength ' + inputs.post_readLength\
\ + ' --nthread ' + inputs.post_nthread + ' --od ' + inputs.post_out_dir\
\ + ' --tmp fd_rmats --task post ' + anchorLength_opt + ' ' + anchorLength_string\
\ + ' --tstat ' + inputs.post_tstat + ' ' + cstat_opt + ' ' + cstat_val\
\ + ' ' + statoff_opt + ' ' + paired_stats_opt + ' ' + darts_model_opt +\
\ ' ' + darts_cutoff_opt + ' ' + darts_cutoff_val + ' ' + novelSS_opt +\
\ ' ' + mil_opt + ' ' + mil_val + ' ' + mel_opt + ' ' + mel_val + ' ' +\
\ individual_counts_opt + '\\n'\nscript += 'tar czf ' + inputs.post_out_dir\
\ + '.tar.gz ' + inputs.post_out_dir + '\\n'\nreturn(script)}"
entryname: script.sh
writable: false
- class: ResourceRequirement
Expand All @@ -572,7 +577,7 @@ steps:
- exp_read_outcome_outcomes
run:
class: ExpressionTool
cwlVersion: v1.0
cwlVersion: v1.2
expression: "${\nvar outcomes = new Array()\nfor (var i = 0; i < inputs.exp_read_outcome_prep_g1.length;\
\ i++) {\n outcomes.push(inputs.exp_read_outcome_prep_g1[i])\n}\nfor (var i\
\ = 0; i < inputs.exp_read_outcome_prep_g2.length; i++) {\n outcomes.push(inputs.exp_read_outcome_prep_g2[i])\n\
Expand Down
2 changes: 1 addition & 1 deletion nextflow/nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ params {

// input bam files
bam_g1 = null
bam_g2 = null
bam_g2 = []

// reference
gtf = null
Expand Down
Loading

0 comments on commit fb6e269

Please sign in to comment.