From 9dcb22cd12031cd5597a2ad4b1bfc874e421dadc Mon Sep 17 00:00:00 2001 From: Anne Marie Noronha Date: Tue, 26 Jan 2021 15:31:29 -0500 Subject: [PATCH 1/3] add support for file input in params.germline parameter to selectively process germline samples in germline mode in a given list. --- lib/TempoUtils.groovy | 12 ++++++++++++ pipeline.nf | 29 +++++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/lib/TempoUtils.groovy b/lib/TempoUtils.groovy index fba1c2f74..4f5580262 100755 --- a/lib/TempoUtils.groovy +++ b/lib/TempoUtils.groovy @@ -7,6 +7,18 @@ import nextflow.Channel class TempoUtils { + static def extractGermlineSamples(tsvFile) { + def allRows = [:] + Channel.from(tsvFile) + .splitCsv(sep: '\t', header: true) + .map { row -> + checkHeader([row.NORMAL_ID], tsvFile) + if(!checkNumberOfItem(row, 1, tsvFile)){System.exit(1)} + if(!checkDuplicates(allRows, row, row, tsvFile)){System.exit(1)} + row.NORMAL_ID + } + } + static def extractPairing(tsvFile) { def allRows = [:] Channel.from(tsvFile) diff --git a/pipeline.nf b/pipeline.nf index 812da55a1..68b099759 100755 --- a/pipeline.nf +++ b/pipeline.nf @@ -90,7 +90,10 @@ if (!(workflow.profile in ['juno', 'awsbatch', 'docker', 'singularity', 'test_si publishAll = params.publishAll outDir = file(params.outDir).toAbsolutePath() outname = params.outname -runGermline = params.germline +runGermline = params.germline != false ? true : params.germline +runGermlineFile = params.germline +//TempoUtils.extractGermlineSamples(runGermlineFile).view() +//exit 1 runSomatic = params.somatic runQC = params.QC runAggregate = params.aggregate @@ -768,7 +771,17 @@ if (params.pairing) { def normalBai = item[4] return [ idNormal, target, normalBam, normalBai ] } .unique() - .into{ bams4Haplotypecaller; bamsNormal4Polysolver; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } + //.into{ bams4Haplotypecaller; bamsNormal4Polysolver; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } + .into{bamsNormal4Polysolver; bams4GermlineAnalysis} + + if (runGermline != false && file(runGermlineFile.toString()).exists() ){ + if (watch == false) { + bams4GermlineAnalysis.combine(TempoUtils.extractGermlineSamples(file(runGermlineFile)), by:[0]).set{bams4GermlineAnalysis} + } else { + bams4GermlineAnalysis.combine(watchGermline(file(runGermlineFile)), by:[0]).set{bams4GermlineAnalysis} + } + } + bams4GermlineAnalysis.into{bams4Haplotypecaller; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } bamsTumor4Combine.combine(bamsNormal4Combine, by: [0,1,2]) @@ -3967,3 +3980,15 @@ def watchAggregate(tsvFile) { .transpose() .unique() } + +def watchGermline(tsvFile) { + Channel.watchPath(file(tsvFile), 'create, modify') + .splitCsv(sep: '\t', header: true) + .unique() + .map{ row -> + def idNormal = row.NORMAL_ID + if(!TempoUtils.checkNumberOfItem(row, 1, file(tsvFile))){} + idNormal + } + .unique() +} From 262c32d380277ba0e4d181f888894872bd40c656 Mon Sep 17 00:00:00 2001 From: Anne Marie Noronha Date: Tue, 26 Jan 2021 19:17:07 -0500 Subject: [PATCH 2/3] add support for germline selection in cohort aggregation --- pipeline.nf | 53 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/pipeline.nf b/pipeline.nf index 68b099759..5d16e3fb3 100755 --- a/pipeline.nf +++ b/pipeline.nf @@ -92,8 +92,6 @@ outDir = file(params.outDir).toAbsolutePath() outname = params.outname runGermline = params.germline != false ? true : params.germline runGermlineFile = params.germline -//TempoUtils.extractGermlineSamples(runGermlineFile).view() -//exit 1 runSomatic = params.somatic runQC = params.QC runAggregate = params.aggregate @@ -772,14 +770,19 @@ if (params.pairing) { return [ idNormal, target, normalBam, normalBai ] } .unique() //.into{ bams4Haplotypecaller; bamsNormal4Polysolver; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } - .into{bamsNormal4Polysolver; bams4GermlineAnalysis} + .into{bamsNormal4Polysolver; bams4GermlineAnalysis; bams4GermlineAnalysisSkip} if (runGermline != false && file(runGermlineFile.toString()).exists() ){ - if (watch == false) { - bams4GermlineAnalysis.combine(TempoUtils.extractGermlineSamples(file(runGermlineFile)), by:[0]).set{bams4GermlineAnalysis} + if (params.watch == false) { + TempoUtils.extractGermlineSamples(file(runGermlineFile)).into{ germlineFilter; germlineFilter4Cohorts } } else { - bams4GermlineAnalysis.combine(watchGermline(file(runGermlineFile)), by:[0]).set{bams4GermlineAnalysis} + watchGermline(file(runGermlineFile)).into{ germlineFilter; germlineFilter4Cohorts } } + bams4GermlineAnalysis.combine(germlineFilter, by:[0]).set{bams4GermlineAnalysis} + germlineFilter4Cohorts + .map{ normal -> + ["placeholder","placeholder",normal] + }.set{germlineFilter4Cohorts} } bams4GermlineAnalysis.into{bams4Haplotypecaller; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } @@ -3150,8 +3153,9 @@ else if(!(runAggregate == false)) { cohortSomaticAggregateLOHHLA1; cohortSomaticAggregateMetadata; cohortGermlineAggregateMaf; - cohortGermlineAggregateSv; - cohortGermlineAggregateSv1; + //cohortGermlineAggregateSv; + //cohortGermlineAggregateSv1; + cohortGermlineAggregateFilter; cohortQcBamAggregate; cohortQcBamAggregate1; cohortQcBamAggregate2; @@ -3176,6 +3180,25 @@ else if(!(runAggregate == false)) { inputSomaticAggregateMetadata = cohortSomaticAggregateMetadata.combine(MetaData4Aggregate, by:[1,2]).groupTuple(by:[2]) if (runGermline){ + if ( file(runGermlineFile.toString()).exists() ){ + cohortGermlineAggregateFilter + .groupTuple() + .map{cohort, idTumor, idNormal -> + def filterIdTumor = [] + def filterIdNormal = [] + def intersectLists = idNormal.intersect(germline2List(runGermlineFile)) + idTumor.eachWithIndex{ item, index -> + if ( intersectLists.contains(idNormal[index]) ) { + filterIdTumor += item + filterIdNormal += idNormal[index] + } + } + [cohort, filterIdTumor, filterIdNormal] + }.map{ cohort, idTumor, idNormal + -> tuple( groupKey(cohort, idTumor instanceof Collection ? idTumor.size() : 1), idTumor, idNormal) + }.transpose() + .into{cohortGermlineAggregateSv; cohortGermlineAggregateSv1} + } else { cohortGermlineAggregateFilter.into{ cohortGermlineAggregateSv; cohortGermlineAggregateSv1 } } inputGermlineAggregateMaf = cohortGermlineAggregateMaf.combine(mafFile4AggregateGermline, by:[1,2]).groupTuple(by:[2]) inputGermlineAggregateSv = cohortGermlineAggregateSv.combine(dellyMantaCombined4AggregateGermline, by:[2]).groupTuple(by:[1]).map{[it[1], it[5].unique()]} inputGermlineAggregateSvTbi = cohortGermlineAggregateSv1.combine(dellyMantaCombinedTbi4AggregateGermline, by:[2]).groupTuple(by:[1]).map{[it[1], it[5].unique()]} @@ -3992,3 +4015,17 @@ def watchGermline(tsvFile) { } .unique() } +def germline2List(tsvFile){ + def parsedFile = false + def germlineList = [] + TempoUtils.extractGermlineSamples(tsvFiles) + .collect() + .subscribe{ row -> + parsedFile = true + germlineList = row + } + while ( ! parsedFile ){ + sleep(500) + } + return germlineList +} From 9f866052fc37509a0beb3bbf1e74f3934e580d02 Mon Sep 17 00:00:00 2001 From: Anne Marie Noronha Date: Fri, 29 Jan 2021 15:33:49 -0500 Subject: [PATCH 3/3] clean up unused channels and extra code --- pipeline.nf | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pipeline.nf b/pipeline.nf index 5d16e3fb3..52bf68f4c 100755 --- a/pipeline.nf +++ b/pipeline.nf @@ -770,19 +770,15 @@ if (params.pairing) { return [ idNormal, target, normalBam, normalBai ] } .unique() //.into{ bams4Haplotypecaller; bamsNormal4Polysolver; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } - .into{bamsNormal4Polysolver; bams4GermlineAnalysis; bams4GermlineAnalysisSkip} + .into{bamsNormal4Polysolver; bams4GermlineAnalysis} if (runGermline != false && file(runGermlineFile.toString()).exists() ){ if (params.watch == false) { - TempoUtils.extractGermlineSamples(file(runGermlineFile)).into{ germlineFilter; germlineFilter4Cohorts } + TempoUtils.extractGermlineSamples(file(runGermlineFile)).set{ germlineFilter } } else { - watchGermline(file(runGermlineFile)).into{ germlineFilter; germlineFilter4Cohorts } + watchGermline(file(runGermlineFile)).set{ germlineFilter } } bams4GermlineAnalysis.combine(germlineFilter, by:[0]).set{bams4GermlineAnalysis} - germlineFilter4Cohorts - .map{ normal -> - ["placeholder","placeholder",normal] - }.set{germlineFilter4Cohorts} } bams4GermlineAnalysis.into{bams4Haplotypecaller; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline }