diff --git a/lib/TempoUtils.groovy b/lib/TempoUtils.groovy index 4809eb38..80b79c19 100755 --- a/lib/TempoUtils.groovy +++ b/lib/TempoUtils.groovy @@ -7,6 +7,18 @@ import nextflow.Channel class TempoUtils { + static def extractGermlineSamples(tsvFile) { + def allRows = [:] + Channel.from(tsvFile) + .splitCsv(sep: '\t', header: true) + .map { row -> + checkHeader([row.NORMAL_ID], tsvFile) + if(!checkNumberOfItem(row, 1, tsvFile)){System.exit(1)} + if(!checkDuplicates(allRows, row, row, tsvFile)){System.exit(1)} + row.NORMAL_ID + } + } + static def extractPairing(tsvFile) { def allRows = [:] Channel.from(tsvFile) diff --git a/pipeline.nf b/pipeline.nf index f3e67e82..baa859a2 100755 --- a/pipeline.nf +++ b/pipeline.nf @@ -90,7 +90,8 @@ if (!(workflow.profile in ['juno', 'awsbatch', 'docker', 'singularity', 'test_si publishAll = params.publishAll outDir = file(params.outDir).toAbsolutePath() outname = params.outname -runGermline = params.germline +runGermline = params.germline != false ? true : params.germline +runGermlineFile = params.germline runSomatic = params.somatic runQC = params.QC runAggregate = params.aggregate @@ -768,7 +769,18 @@ if (params.pairing) { def normalBai = item[4] return [ idNormal, target, normalBam, normalBai ] } .unique() - .into{ bams4Haplotypecaller; bamsNormal4Polysolver; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } + //.into{ bams4Haplotypecaller; bamsNormal4Polysolver; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } + .into{bamsNormal4Polysolver; bams4GermlineAnalysis} + + if (runGermline != false && file(runGermlineFile.toString()).exists() ){ + if (params.watch == false) { + TempoUtils.extractGermlineSamples(file(runGermlineFile)).set{ germlineFilter } + } else { + watchGermline(file(runGermlineFile)).set{ germlineFilter } + } + bams4GermlineAnalysis.combine(germlineFilter, by:[0]).set{bams4GermlineAnalysis} + } + bams4GermlineAnalysis.into{bams4Haplotypecaller; bamsForStrelkaGermline; bamsForMantaGermline; bamsForDellyGermline } bamsTumor4Combine.combine(bamsNormal4Combine, by: [0,1,2]) @@ -3281,8 +3293,9 @@ else if(!(runAggregate == false)) { cohortSomaticAggregateLOHHLA1; cohortSomaticAggregateMetadata; cohortGermlineAggregateMaf; - cohortGermlineAggregateSv; - cohortGermlineAggregateSv1; + //cohortGermlineAggregateSv; + //cohortGermlineAggregateSv1; + cohortGermlineAggregateFilter; cohortQcBamAggregate; cohortQcBamAggregate1; cohortQcBamAggregate2; @@ -3307,6 +3320,25 @@ else if(!(runAggregate == false)) { inputSomaticAggregateMetadata = cohortSomaticAggregateMetadata.combine(MetaData4Aggregate, by:[1,2]).groupTuple(by:[2]) if (runGermline){ + if ( file(runGermlineFile.toString()).exists() ){ + cohortGermlineAggregateFilter + .groupTuple() + .map{cohort, idTumor, idNormal -> + def filterIdTumor = [] + def filterIdNormal = [] + def intersectLists = idNormal.intersect(germline2List(runGermlineFile)) + idTumor.eachWithIndex{ item, index -> + if ( intersectLists.contains(idNormal[index]) ) { + filterIdTumor += item + filterIdNormal += idNormal[index] + } + } + [cohort, filterIdTumor, filterIdNormal] + }.map{ cohort, idTumor, idNormal + -> tuple( groupKey(cohort, idTumor instanceof Collection ? idTumor.size() : 1), idTumor, idNormal) + }.transpose() + .into{cohortGermlineAggregateSv; cohortGermlineAggregateSv1} + } else { cohortGermlineAggregateFilter.into{ cohortGermlineAggregateSv; cohortGermlineAggregateSv1 } } inputGermlineAggregateMaf = cohortGermlineAggregateMaf.combine(mafFile4AggregateGermline, by:[1,2]).groupTuple(by:[2]) inputGermlineAggregateSv = cohortGermlineAggregateSv.combine(dellyMantaCombined4AggregateGermline, by:[2]).groupTuple(by:[1]).map{[it[1], it[5].unique()]} inputGermlineAggregateSvTbi = cohortGermlineAggregateSv1.combine(dellyMantaCombinedTbi4AggregateGermline, by:[2]).groupTuple(by:[1]).map{[it[1], it[5].unique()]} @@ -4171,3 +4203,29 @@ def watchAggregate(tsvFile) { .transpose() .unique() } + +def watchGermline(tsvFile) { + Channel.watchPath(file(tsvFile), 'create, modify') + .splitCsv(sep: '\t', header: true) + .unique() + .map{ row -> + def idNormal = row.NORMAL_ID + if(!TempoUtils.checkNumberOfItem(row, 1, file(tsvFile))){} + idNormal + } + .unique() +} +def germline2List(tsvFile){ + def parsedFile = false + def germlineList = [] + TempoUtils.extractGermlineSamples(tsvFiles) + .collect() + .subscribe{ row -> + parsedFile = true + germlineList = row + } + while ( ! parsedFile ){ + sleep(500) + } + return germlineList +}