diff --git a/bin/assignment.py b/bin/assignment.py new file mode 100644 index 0000000..4675a22 --- /dev/null +++ b/bin/assignment.py @@ -0,0 +1,97 @@ +import sys +from collections import defaultdict + + +def trim_read_id(read_id): + """ + Parses the read_id to remove forward/reverse identifier. + + Parameters: + read_id (str): A read name. + + Returns: + read_id (str): Trimmed read name without forward/reverse identifiers. + """ + if read_id.endswith("/1") or read_id.endswith("/2"): + read_id = read_id[:-2] + + return read_id + + +def parse_kraken_assignment_line(line): + """ + Parses the read_id and taxon_id from a line in the kraken assignment file. + + Parameters: + line (str): A line from kraken assignment file. + + Returns: + taxon_id (str): The NCBI taxon identifier. + read_id (str): trimmed read identifier. + """ + line_vals = line.strip().split("\t") + if len(line_vals) < 5: + return -1, "" + if "taxid" in line_vals[2]: + temp = line_vals[2].split("taxid ")[-1] + taxon_id = temp[:-1] + else: + taxon_id = line_vals[2] + + read_id = trim_read_id(line_vals[1]) + + if taxon_id == "A": + taxon_id = "81077" + else: + taxon_id = taxon_id + return taxon_id, read_id + + +class KrakenAssignments: + """ + A class representing a kraken assignment file. + + Attributes: + file (str): Name of file to parse. + """ + def __init__(self, assignment_file): + """ + Initializes an KrakenAssignments object. + """ + self.file = assignment_file + + def parse_kraken_assignment_file(self, taxon_id_map, parents=None): + """ + Parses the kraken assignment file and collects the read_ids associated with each of the + required taxon ids. + + Parameters: + taxon_id_map (dict): A dict from a taxon id to a list of related taxon_ids. + parents (dict): A dict mapping taxon id to parent taxon id from NCBI Taxonomy + + Returns: + read_map (dict): A dict from read_id to a set of values from the taxon_id_map if the + read_id was classified as the taxon_id. + """ + read_map = defaultdict(set) + with open(self.file, "r") as kfile: + for line in kfile: + taxon_id, read_id = parse_kraken_assignment_line(line) + if taxon_id in taxon_id_map: + if read_id in read_map and taxon_id != read_map[read_id]: + del read_map[read_id] + else: + read_map[read_id].add(taxon_id) + elif parents: + # handle case where taxon_id has changed + current = taxon_id + while current in parents and current not in taxon_id_map and current != "1": + current = parents[current] + if current in taxon_id_map: + print(f"Add {taxon_id} to {current} list") + if read_id in read_map and current != read_map[read_id]: + del read_map[read_id] + else: + read_map[read_id].add(current) + return read_map + diff --git a/bin/check_hcid.py b/bin/check_hcid.py index fd5c3c6..3b49288 100755 --- a/bin/check_hcid.py +++ b/bin/check_hcid.py @@ -134,10 +134,10 @@ def check_report_for_hcid(hcid_dict, taxonomy_dir, kreport_file): hcid_dict[taxid]["classified_parent_found"] = True -def map_to_refs(query, reference): +def map_to_refs(query, reference, preset): counts = defaultdict(int) ranges = defaultdict(list) - a = mp.Aligner(reference, best_n=1) # load or build index + a = mp.Aligner(reference, best_n=1, preset=preset) # load or build index if not a: raise Exception("ERROR: failed to load/build index") @@ -168,8 +168,8 @@ def check_pileup(ref, ref_ranges, reference_file, min_coverage=0): return 0 -def check_ref_coverage(hcid_dict, query, reference): - counts, ranges = map_to_refs(query, reference) +def check_ref_coverage(hcid_dict, query, reference, preset): + counts, ranges = map_to_refs(query, reference, preset) for taxon in hcid_dict: taxon_found = True @@ -288,8 +288,17 @@ def main(): default="resources/hcid_refs.fa.gz", help="Reference FASTA for each HCID", ) + parser.add_argument( + "--illumina", + action="store_true", + required=False, + help="Use the short read minimap preset", + ) args = parser.parse_args() + preset = None + if args.illumina: + preset = "sr" # Start Program now = datetime.now() @@ -301,7 +310,7 @@ def main(): sys.stderr.write("Check kraken report for counts\n") check_report_for_hcid(hcid_dict, args.taxonomy, args.kreport_file) sys.stderr.write("Check mapped coverage\n") - check_ref_coverage(hcid_dict, args.reads, args.ref_fasta) + check_ref_coverage(hcid_dict, args.reads, args.ref_fasta, preset) sys.stderr.write("Report findings\n") report_findings(hcid_dict, args.prefix) diff --git a/bin/concatenate_reads.py b/bin/concatenate_reads.py index f818cfc..6ccb8ac 100755 --- a/bin/concatenate_reads.py +++ b/bin/concatenate_reads.py @@ -12,12 +12,13 @@ def enforce_headers(f1_header, f2_header): if f1_header[0] != "@" or f2_header[0] != "@": - # raise Exception("Invalid input FASTQ files.") sys.exit(5) - if f1_header.strip().split(" ")[0] != f2_header.strip().split(" ")[0]: - # raise Exception( - # "Input FASTQ files do not share headers. " "Check and re-run w/o --strict." - # ) + + if f1_header.split()[0].endswith("/1") and f2_header.split()[0].endswith("/2"): + if f1_header.split()[0][:-2] != f2_header.split()[0][:-2]: + sys.exit(8) + + elif f1_header.strip().split(" ")[0] != f2_header.strip().split(" ")[0]: sys.exit(8) diff --git a/bin/extract_fraction_from_reads.py b/bin/extract_fraction_from_reads.py index 7ee7241..c39cc55 100755 --- a/bin/extract_fraction_from_reads.py +++ b/bin/extract_fraction_from_reads.py @@ -10,302 +10,37 @@ from collections import defaultdict from pathlib import Path -from extract_utils import mean,median,check_read_files,parse_kraken_assignment_line,parse_kraken_assignment_file,trim_read_id - - -def load_from_taxonomy(taxonomy_dir, taxids, include_unclassified): - sys.stderr.write("Loading hierarchy\n") - entries = {} - if include_unclassified: - entries["0"] = {"name": "unclassified", - "taxon": "0", - "rank": None - } - - taxid_map = defaultdict(set) - for key in taxids: - taxid_map[key].add(key) - if include_unclassified: - taxid_map["0"].update(taxids) - - children = defaultdict(set) - parent = defaultdict(str) - if len(taxids) > 0: - try: - taxonomy = os.path.join(taxonomy_dir, "nodes.dmp") - with open(taxonomy, "r") as f: - for line in f: - fields = line.split("\t|\t") - taxid, parent_taxid, rank = fields[0], fields[1], fields[2] - children[parent_taxid].add(taxid) - parent[taxid] = parent_taxid - if taxid in taxids: - entries[taxid] = {"name": None, - "taxon": taxid, - "rank": rank - } - except: - sys.stderr.write( - "ERROR: Could not find taxonomy nodes.dmp file in %s" % taxonomy_dir - ) - sys.exit(4) - - try: - taxonomy = os.path.join(taxonomy_dir, "names.dmp") - with open(taxonomy, "r") as f: - for line in f: - fields = line.split("\t|\t") - taxid, name, name_type = fields[0], fields[1], fields[3] - if taxid in taxids and ("scientific name" in name_type or entries[taxid]["name"] == None): - entries[taxid]["name"] = name - - except: - sys.stderr.write( - "ERROR: Could not find taxonomy names.dmp file in %s" % taxonomy_dir - ) - sys.exit(4) - - check = list(taxids) - while len(check) > 0: - current = check.pop() - check.extend(children[current]) - for child in children[current]: - taxid_map[child].update(taxid_map[current]) - - return taxid_map, entries, parent - -def fastq_iterator( - prefix: str, - filetype: str, - include_unclassified: bool, - entries: dict, - read_map: dict, - fastq_1: Path, - fastq_2: Path = None, -) -> tuple[dict, dict, dict]: - """Func to iterate over fastq files and extract reads of interest - - Args: - prefix (str): Outfile prefix - filetype (str): output filetype (only affects naming) - include_unclassified (bool): true if includes unclassified reads - read_map (dict): dict of read_id: taxon_list (from kraken report) - subtaxa_map (dict): dict of subtaxa: output taxon (from load_from_taxonomy) - exclude (bool): if true, inverts the logic for including reads - fastq_1 (Path): Path to fastq _1 file - fastq_2 (Path, optional): Path to fastq _2 file if input is paired data. Defaults to None. - - Returns: - tuple[dict, dict, dict]: number of reads written by taxa, quality scores by taxa, sequence length by taxa - """ - reads_of_interest = set(read_map.keys()) - - out_counts = defaultdict(int) - quals = defaultdict(list) - lens = defaultdict(list) - names = defaultdict(list) - - sys.stderr.write("Open file handles\n") - out_handles_1 = {} - out_handles_2 = {} - - print(entries) - for taxon, entry in entries.items(): - taxon_name = entry["name"].lower() +from extract_utils import * +from report import KrakenReport +from assignment import KrakenAssignments +from taxonomy import Taxonomy + +def setup_prefixes(list_taxon_ids, entries, prefix, inverse=False, include_unclassified=False): + outprefix = {} + if inverse: + return {"other": prefix} + + for taxon_id in list_taxon_ids: + taxon_name = entries[taxon_id].name.lower() if include_unclassified and taxon_name != "unclassified": taxon_name += "_and_unclassified" taxon_name = taxon_name.replace("viruses", "viral") - if fastq_2: - out_handles_1[taxon] = open(f"{taxon_name}_1.{filetype}", "w") - out_handles_2[taxon] = open(f"{taxon_name}_2.{filetype}", "w") - names[taxon].append(f"{taxon_name}_1.{filetype}") - names[taxon].append(f"{taxon_name}_2.{filetype}") - else: - out_handles_1[taxon] = open(f"{taxon_name}.{filetype}", "w") - names[taxon].append(f"{taxon_name}.{filetype}") - - sys.stderr.write("Iterating through read file\n") - count = 0 - forward_count = 0 - reverse_count = 0 - for record in pyfastx.Fastq(fastq_1, build_index=False): - count += 1 - if count % 1000000 == 0: - print(count) - name, seq, qual = record - trimmed_name = trim_read_id(name) - if trimmed_name not in reads_of_interest: - continue - - for taxon in read_map[trimmed_name]: - out_handles_1[taxon].write(f"@{name}\n{seq}\n+\n{qual}\n") - out_counts[taxon] += 1 - forward_count += 1 - quals[taxon].append(median([ord(x) - 33 for x in qual])) - lens[taxon].append(len(seq)) - - if fastq_2: - sys.stderr.write("Iterating second read file of pair\n") - for record in pyfastx.Fastq(fastq_2, build_index=False): - name, seq, qual = record - trimmed_name = trim_read_id(name) - if trimmed_name not in reads_of_interest: - continue - - for taxon in read_map[trimmed_name]: - out_handles_2[taxon].write(f"@{name}\n{seq}\n+\n{qual}\n") - out_counts[taxon] += 1 - reverse_count += 1 - quals[taxon].append(median([ord(x) - 33 for x in qual])) - lens[taxon].append(len(seq)) + outprefix[taxon_id] = taxon_name - if forward_count != reverse_count and (forward_count == 0 or reverse_count == 0): - sys.stderr.write( - "ERROR: No reads found for one of the file pair: extracted %i an %i reads respectively" % (forward_count, reverse_count) - ) - sys.exit(7) + return outprefix - for taxon in out_handles_1: - out_handles_1[taxon].close() - for taxon in out_handles_2: - out_handles_2[taxon].close() - - return (out_counts, quals, lens, names) - - -def fastq_iterator_inverse( - prefix: str, - filetype: str, - taxids: list, - read_map: dict, - fastq_1: Path, - fastq_2: Path = None, -) -> tuple[dict, dict, dict]: - """Func to iterate over fastq files and extract reads of interest - - Args: - prefix (str): Outfile prefix - filetype (str): output filetype (only affects naming) - read_map (dict): dict of read_id: taxon_list (from kraken report) - subtaxa_map (dict): dict of subtaxa: output taxon (from load_from_taxonomy) - exclude (bool): if true, inverts the logic for including reads - fastq_1 (Path): Path to fastq _1 file - fastq_2 (Path, optional): Path to fastq _2 file if input is paired data. Defaults to None. - - Returns: - tuple[dict, dict, dict]: number of reads written by taxa, quality scores by taxa, sequence length by taxa - """ - reads_of_interest = set(read_map.keys()) - print(reads_of_interest) - - out_counts = defaultdict(int) - quals = defaultdict(list) - lens = defaultdict(list) - names = defaultdict(list) - - sys.stderr.write("Open file handles\n") - out_handles_1 = {} - out_handles_2 = {} - - if fastq_2: - out_handles_1["all"] = open(f"{prefix}_1.{filetype}", "w") - out_handles_2["all"] = open(f"{prefix}_2.{filetype}", "w") - names["all"].append(f"{prefix}_1.{filetype}") - names["all"].append(f"{prefix}_2.{filetype}") - else: - out_handles_1["all"] = open(f"{prefix}.{filetype}", "w") - names["all"].append(f"{prefix}.{filetype}") - - sys.stderr.write("Iterating through read file\n") - count = 0 - forward_count = 0 - reverse_count = 0 - for record in pyfastx.Fastq(fastq_1, build_index=False): - count += 1 - if count % 1000000 == 0: - print(count) - name, seq, qual = record - trimmed_name = trim_read_id(name) - if trimmed_name in reads_of_interest: - continue - - out_handles_1["all"].write(f"@{name}\n{seq}\n+\n{qual}\n") - out_counts["all"] += 1 - forward_count += 1 - quals["all"].append(median([ord(x) - 33 for x in qual])) - lens["all"].append(len(seq)) - - if fastq_2: - sys.stderr.write("Iterating second read file of pair\n") - for record in pyfastx.Fastq(fastq_2, build_index=False): - name, seq, qual = record - trimmed_name = trim_read_id(name) - if trimmed_name in reads_of_interest: - continue - - out_handles_2["all"].write(f"@{name}\n{seq}\n+\n{qual}\n") - out_counts["all"] += 1 - reverse_count += 1 - quals["all"].append(median([ord(x) - 33 for x in qual])) - lens["all"].append(len(seq)) - - if forward_count != reverse_count and (forward_count == 0 or reverse_count == 0): - sys.stderr.write( - "ERROR: No reads found for one of the file pair: extracted %i an %i reads respectively" % (forward_count, reverse_count) - ) - sys.exit(7) - - for taxon in out_handles_1: - out_handles_1[taxon].close() - for taxon in out_handles_2: - out_handles_2[taxon].close() - - return (out_counts, quals, lens, names) def extract_reads( - read_map, entries, reads1, reads2, prefix, taxids, exclude, include_unclassified + read_map, taxon_id_map, entries, reads1, reads2, prefix, taxon_ids, exclude, include_unclassified ): - # open read files + # check read files filetype, zipped = check_read_files(reads1) - if exclude: - out_counts, quals, lens, names = fastq_iterator_inverse( - prefix, filetype, taxids, read_map, reads1, reads2 - ) - else: - out_counts, quals, lens, names = fastq_iterator( - prefix, filetype, include_unclassified, entries, read_map, reads1, reads2 - ) + prefixes = setup_prefixes(taxon_ids, entries, prefix, inverse=exclude, include_unclassified=include_unclassified) + out_counts, quals, lens, filenames = process_read_files(prefixes, filetype, read_map, taxon_id_map, reads1, reads2, inverse=exclude, get_handles=True) + + generate_summary(taxon_ids, entries, prefix, out_counts, quals, lens, filenames, include_unclassified=(include_unclassified != exclude), short=True) - sys.stderr.write("Write summary\n") - summary = [] - for taxon in names: - if reads2: - summary.append( - { - "filenames": names[taxon], - "qc_metrics": { - "num_reads": out_counts[taxon], - "avg_qual": mean(quals[taxon]), - "mean_len": mean(lens[taxon]), - }, - "includes_unclassified": (include_unclassified != exclude) # this is xor - } - ) - else: - summary.append( - { - "filenames": names[taxon], - "qc_metrics": { - "num_reads": out_counts[taxon], - "avg_qual": mean(quals[taxon]), - "mean_len": mean(lens[taxon]), - }, - "includes_unclassified": (include_unclassified != exclude) # this is xor - } - ) - with open("%s_summary.json" % prefix, "w") as f: - json.dump(summary, f) return out_counts @@ -346,7 +81,6 @@ def main(): "--prefix", dest="prefix", required=True, - default="taxid", help="Prefix for output files", ) @@ -382,12 +116,18 @@ def main(): time = now.strftime("%m/%d/%Y, %H:%M:%S") sys.stderr.write("PROGRAM START TIME: " + time + "\n") - taxid_map, entries, parent = load_from_taxonomy(args.taxonomy, args.taxid, args.include_unclassified) - read_map = parse_kraken_assignment_file(args.kraken_assignment_file, taxid_map, parent) + loaded_taxonomy = Taxonomy(args.taxonomy) + taxon_id_map = loaded_taxonomy.get_taxon_id_map(args.taxid, args.include_unclassified) + loaded_taxonomy.load_entries(args.taxonomy, taxon_id_map.keys()) + + # Initialize kraken assignment file + kraken_assignment = KrakenAssignments(args.kraken_assignment_file) + read_map = kraken_assignment.parse_kraken_assignment_file(taxon_id_map) out_counts = extract_reads( read_map, - entries, + taxon_id_map, + loaded_taxonomy.entries, args.reads1, args.reads2, args.prefix, diff --git a/bin/extract_taxa_from_reads.py b/bin/extract_taxa_from_reads.py index 86b9171..fa78c67 100755 --- a/bin/extract_taxa_from_reads.py +++ b/bin/extract_taxa_from_reads.py @@ -17,138 +17,14 @@ from collections import defaultdict from pathlib import Path -from extract_utils import mean,median,check_read_files,parse_kraken_assignment_line,parse_kraken_assignment_file,trim_read_id - - -def load_from_taxonomy(taxonomy_dir, parents, children): - taxonomy = os.path.join(taxonomy_dir, "nodes.dmp") - try: - with open(taxonomy, "r") as f: - for line in f: - fields = line.split("\t|\t") - tax_id, parent_tax_id = fields[0], fields[1] - parents[tax_id] = parent_tax_id - children[parent_tax_id].add(tax_id) - except: - sys.stderr.write( - "ERROR: Could not find taxonomy nodes.dmp file in %s" % taxonomy_dir - ) - sys.exit(4) - return parents, children - -def parse_depth(name): - parse_name = name.split(" ") - depth = 0 - for i in parse_name: - if i != "": - break - depth += 1 - depth = int(depth / 2) - return depth - - -def infer_hierarchy(report_file, parents, children): - hierarchy = [] - with open(report_file, "r") as f: - for line in f: - if line.startswith("% of Seqs"): - continue - try: - ( - percentage, - num_clade_root, - num_direct, - raw_rank, - ncbi, - name, - ) = line.strip().split("\t") - except: - ( - percentage, - num_clade_root, - num_direct, - ignore1, - ignore2, - raw_rank, - ncbi, - name, - ) = line.strip().split("\t") - depth = parse_depth(name) - hierarchy = hierarchy[: depth - 1] - hierarchy.append(ncbi) - - if len(hierarchy) > 1: - parent = hierarchy[-2] - if ncbi not in parents: - parents[ncbi] = parent - children[parent].add(ncbi) - return parents, children - - -def load_report_file(report_file, max_human=None): - entries = {} - # parses a kraken or bracken file - with open(report_file, "r") as f: - for line in f: - if line.startswith("% of Seqs"): - continue - try: - ( - percentage, - num_clade_root, - num_direct, - raw_rank, - ncbi, - name, - ) = line.strip().split("\t") - except: - ( - percentage, - num_clade_root, - num_direct, - ignore1, - ignore2, - raw_rank, - ncbi, - name, - ) = line.strip().split("\t") - percentage = float(percentage) - num_clade_root = int(num_clade_root) - num_direct = int(num_direct) - if num_direct > num_clade_root: - num_direct, num_clade_root = num_clade_root, num_direct - name = name.strip() - rank = raw_rank[0] - - if name in ["Homo sapiens"]: - if max_human and name == "Homo sapiens" and num_direct > max_human: - sys.stderr.write( - "ERROR: found %i human reads, max allowed is %i\n" - % (num_direct, max_human) - ) - sys.exit(2) - continue - - if name in ["unclassified", "root"]: - continue - - entries[ncbi] = { - "percentage": percentage, - "count": num_direct, - "count_descendants": num_clade_root, - "raw_rank": raw_rank, - "rank": rank, - "name": name, - } - - sys.stderr.write("FOUND %i TAXA IN KRAKEN REPORT\n" % len(entries)) - return entries - +from extract_utils import * +from report import KrakenReport +from assignment import KrakenAssignments +from taxonomy import Taxonomy def get_taxon_id_lists( - report_entries, - parents, - children, + kraken_report, + loaded_taxonomy, names=[], target_ranks=[], min_count=None, @@ -159,39 +35,45 @@ def get_taxon_id_lists( include_children=False ): lists_to_extract = defaultdict(set) - for taxon in report_entries: - entry = report_entries[taxon] - if len(target_ranks) > 0 and entry["rank"] not in target_ranks: + for taxon in kraken_report.entries: + entry = kraken_report.entries[taxon] + if len(target_ranks) > 0 and entry.rank not in target_ranks: continue - if min_count and entry["count"] < min_count: + if min_count and entry.ucount < min_count: continue - if min_count_descendants and entry["count_descendants"] < min_count_descendants: + if min_count_descendants and entry.count < min_count_descendants: continue - if min_percent and entry["percentage"] < min_percent: + if min_percent and kraken_report.get_percentage(taxon) < min_percent: continue - if len(names) > 0 and entry["name"] not in names and taxon not in names: + if len(names) > 0 and entry.name not in names and taxon not in names: continue lists_to_extract[taxon].add(taxon) if include_parents: - lookup = taxon - while lookup in parents and lookup != "1": - lookup = parents[lookup] - if lookup != "1": - lists_to_extract[taxon].add(lookup) + lookup = [taxon] + while len(lookup) > 0: + parent = lookup.pop() + if parent in loaded_taxonomy.parents and parent != "1": + lookup.append(loaded_taxonomy.parents[lookup]) + if parent in kraken_report.entries and parent != "1": + lookup.append(kraken_report.entries[parent].parent) + if parent != "1": + lists_to_extract[taxon].add(parent) if include_children: lookup = [taxon] while len(lookup) > 0: child = lookup.pop() lists_to_extract[taxon].add(child) - if child in children: - lookup.extend(children[child]) + if child in loaded_taxonomy.children: + lookup.extend(loaded_taxonomy.children[child]) + if child in kraken_report.entries: + lookup.extend(kraken_report.entries[child].children) sys.stderr.write("SELECTED %i TAXA TO EXTRACT\n" % len(lists_to_extract)) if top_n and len(lists_to_extract) > top_n: X = list(lists_to_extract.keys()) - Y = [report_entries[x]["percentage"] for x in X] + Y = [kraken_report.get_percentage(x) for x in X] ordered = [x for _, x in sorted(zip(Y, X))] to_delete = ordered[top_n:] for taxon in to_delete: @@ -200,101 +82,20 @@ def get_taxon_id_lists( return lists_to_extract -def fastq_iterator( - prefix: str, - filetype: str, - read_map: dict, - subtaxa_map: dict, - fastq_1: Path, - fastq_2: Path = None, -) -> tuple[dict, dict, dict]: - """Func to iterate over fastq files and extract reads of interest - - Args: - prefix (str): Outfile prefix - filetype (str): output filetype (only affects naming) - read_map (dict): dict of read_id: taxon_list (from kraken report) - subtaxa_map (dict): dict of subtaxa: output taxon (from lists_to_extract) - fastq_1 (Path): Path to fastq _1 file - fastq_2 (Path, optional): Path to fastq _2 file if input is paired data. Defaults to None. - - Returns: - tuple[dict, dict, dict]: number of reads written by taxa, quality scores by taxa, sequence length by taxa - """ - reads_of_interest = set(read_map.keys()) - - out_counts = defaultdict(int) - quals = defaultdict(list) - lens = defaultdict(list) - - out_records_1 = defaultdict(list) - out_records_2 = defaultdict(list) - forward_count = 0 - reverse_count = 0 - - sys.stderr.write("Reading in\n") - for record in pyfastx.Fastq(fastq_1, build_index=False): - name, seq, qual = record - trimmed_name = trim_read_id(name) - if trimmed_name not in reads_of_interest: - continue - - for k2_taxon in read_map[trimmed_name]: - for taxon in subtaxa_map[k2_taxon]: - out_counts[taxon] += 1 - forward_count += 1 - quals[taxon].append(median([ord(x) - 33 for x in qual])) - lens[taxon].append(len(seq)) - - out_records_1[taxon].append(record) - - sys.stderr.write("Writing records\n") - for taxon, records in out_records_1.items(): - if fastq_2: - with open(f"{taxon}_1.{filetype}", "w") as f: - for record in records: - name, seq, qual = record - f.write(f"@{name}\n{seq}\n+\n{qual}\n") - else: - with open(f"{taxon}.{filetype}", "w") as f: - for record in records: - name, seq, qual = record - f.write(f"@{name}\n{seq}\n+\n{qual}\n") - del out_records_1 - - if fastq_2: - sys.stderr.write("Reading in second file of pair\n") - for record in pyfastx.Fastq(fastq_2, build_index=False): - name, seq, qual = record - trimmed_name = trim_read_id(name) - if trimmed_name not in reads_of_interest: - continue - - for k2_taxon in read_map[trimmed_name]: - for taxon in subtaxa_map[k2_taxon]: - out_counts[taxon] += 1 - reverse_count += 1 - quals[taxon].append(median([ord(x) - 33 for x in qual])) - lens[taxon].append(len(seq)) - - out_records_2[taxon].append(record) - - sys.stderr.write("Writing records for second file in pair\n") - for taxon, records in out_records_2.items(): - with open(f"{taxon}_2.{filetype}", "w") as f: - for record in records: - name, seq, qual = record - f.write(f"@{name}\n{seq}\n+\n{qual}\n") - if forward_count != reverse_count and (forward_count == 0 or reverse_count == 0): - sys.stderr.write( - "ERROR: No reads found for one of the file pair: extracted %i an %i reads respectively" % (forward_count, reverse_count) - ) - sys.exit(7) - return (out_counts, quals, lens) +def setup_prefixes(lists_to_extract, prefix=None): + outprefix = {} + #if prefix: + # for taxid in lists_to_extract: + # outprefix[taxid] = f"{prefix}_{taxid}" + #else: + for taxid in lists_to_extract: + outprefix[taxid] = f"{taxid}" + return outprefix + def extract_taxa( - report_entries, lists_to_extract, kraken_assignment_file, reads1, reads2, prefix + kraken_report, lists_to_extract, kraken_assignment, reads1, reads2, prefix ): # open read files filetype, zipped = check_read_files(reads1) @@ -304,61 +105,27 @@ def extract_taxa( for taxon, subtaxons in lists_to_extract.items(): for subtaxa in subtaxons: subtaxa_map[subtaxa].add(taxon) - # sys.stderr.write( # "INCLUDING PARENTS/CHILDREN, HAVE %i TAXA TO INCLUDE IN READ FILES for %s\n" # % (len(lists_to_extract[taxon]), taxon) # ) - read_map = parse_kraken_assignment_file(kraken_assignment_file, subtaxa_map) + read_map = kraken_assignment.parse_kraken_assignment_file(subtaxa_map) - sys.stderr.write("Iterating through read file\n") - out_counts, quals, lens = fastq_iterator( - prefix, filetype, read_map, subtaxa_map, reads1, reads2 + prefixes = setup_prefixes(lists_to_extract, prefix) + out_counts, quals, lens, filenames = process_read_files( + prefixes, filetype, read_map, subtaxa_map, reads1, reads2, inverse=False, get_handles=False ) - sys.stderr.write("Write summary\n") - summary = [] - for taxon in lists_to_extract: - if out_counts[taxon] == 0: - sys.stderr.write("No reads extracted for taxid %s\n" %taxon) - continue - if reads2: - summary.append( - { - "human_readable": report_entries[taxon]["name"], - "taxon": taxon, - "tax_level": report_entries[taxon]["rank"], - "filenames": [ - "%s_1.%s" % (taxon, filetype), - "%s_2.%s" % (taxon, filetype), - ], - "qc_metrics": { - "num_reads": out_counts[taxon], - "avg_qual": mean(quals[taxon]), - "mean_len": mean(lens[taxon]), - } - } - ) - else: - summary.append( - { - "human_readable": report_entries[taxon]["name"], - "taxon": taxon, - "tax_level": report_entries[taxon]["rank"], - "filenames": [ - "%s.%s" % (taxon, filetype), - ], - "qc_metrics": { - "num_reads": out_counts[taxon], - "avg_qual": mean(quals[taxon]), - "mean_len": mean(lens[taxon]), - } - } - ) - with open("%s_summary.json" % prefix, "w") as f: - json.dump(summary, f) + generate_summary(lists_to_extract, kraken_report.entries, prefix, out_counts, quals, lens, filenames, short=False) return out_counts +def check_out_counts(out_counts, kraken_report): + for taxon_id in out_counts: + if out_counts[taxon_id] != kraken_report.entries[taxon_id].count: + sys.stderr.write( + f"Did not correctly extract all reads for {taxon_id}, found {out_counts[taxon_id]} whilst the kraken report contains {kraken_report.entries[taxon_id].count}" + ) + sys.exit(2) # Main method def main(): @@ -402,8 +169,7 @@ def main(): "-p", "--prefix", dest="prefix", - required=True, - default="taxid", + required=False, help="Prefix for output files", ) @@ -501,22 +267,24 @@ def main(): else: target_ranks = [] - sys.stderr.write("Loading hierarchy\n") - parent = {} - children = defaultdict(set) + loaded_taxonomy = None if args.taxonomy: - parent, children = load_from_taxonomy(args.taxonomy, parent, children) - parent, children = infer_hierarchy(args.report_file, parent, children) + sys.stderr.write("Loading taxonomy\n") + loaded_taxonomy = Taxonomy(args.taxonomy) + + # Load kraken report entries + sys.stderr.write("Loading kraken report\n") + kraken_report = KrakenReport(args.report_file) + kraken_report.check_host({9606:args.max_human}) - # get taxids to extract - sys.stderr.write("Loading kreport\n") - report_entries = load_report_file(args.report_file, args.max_human) + # Initialize kraken assignment file + sys.stderr.write("Loading kraken assignments\n") + kraken_assignment = KrakenAssignments(args.kraken_assignment_file) - sys.stderr.write("Checking for lists to extract\n") + sys.stderr.write("Identifying lists to extract\n") lists_to_extract = get_taxon_id_lists( - report_entries, - parent, - children, + kraken_report, + loaded_taxonomy, names=args.taxid, target_ranks=target_ranks, min_count=args.min_count, @@ -527,11 +295,11 @@ def main(): include_children=args.include_children ) - sys.stderr.write("Performing extractions\n") + sys.stderr.write("Extracting reads from file\n") out_counts = extract_taxa( - report_entries, + kraken_report, lists_to_extract, - args.kraken_assignment_file, + kraken_assignment, args.reads1, args.reads2, args.prefix diff --git a/bin/extract_utils.py b/bin/extract_utils.py index df66fa5..003c92e 100755 --- a/bin/extract_utils.py +++ b/bin/extract_utils.py @@ -9,27 +9,54 @@ from datetime import datetime from collections import defaultdict from pathlib import Path +import statistics as stats + +from assignment import trim_read_id def mean(l): + """ + Takes a list of numbers and returns the mean. + + Args: + l (list): A list of numbers. + + Returns: + 0: If l had zero length. + float: The mean of items in the list. + """ if len(l) == 0: return 0 - return sum(l) / len(l) + return stats.fmean(l) def median(l): + """ + Takes a list of numbers and returns the median. + + Args: + l (list): A list of numbers. + + Returns: + 0: If l had zero length. + float: The median of items in the list. + """ if len(l) == 0: return 0 - if len(l) % 2 == 0: - i = (len(l)) / 2 - else: - i = (len(l) + 1) / 2 - i = int(i) - 1 - l = sorted(l) - return l[i] + return stats.median(l) def check_read_files(reads): + """ + Takes a read filename and checks if the file is zipped, a FASTA or FASTQ format. + + Args: + reads (str): The reads filename. + + Returns: + filetype (str): Either "fasta" or "fastq". + zipped (bool): Whether the file is gzipped or not. + """ if reads[-3:] == ".gz": read_file = gzip.open(reads, "rt") zipped = True @@ -49,70 +76,282 @@ def check_read_files(reads): sys.exit(5) return filetype, zipped +def setup_outfiles(fastq_2, prefixes, filetype, get_handles=True): + """ + Sets up the output filenames, optionally opening them and returning handles. -def parse_kraken_assignment_line(line): - line_vals = line.strip().split("\t") - if len(line_vals) < 5: - return -1, "" - if "taxid" in line_vals[2]: - temp = line_vals[2].split("taxid ")[-1] - tax_id = temp[:-1] - else: - tax_id = line_vals[2] + Args: + fastq_2 (bool): True or a string if fastq_2 exists, False or None if reads were not paired. + prefixes (dict): A dictionary from the key (usually taxon_id) to the prefix of the output file for that key. + filetype (str): "fasta" or "fastq" depending on input filetype. + get_handles (bool): Whether to open file handles for the output files. - read_id = trim_read_id(line_vals[1]) + Returns: + filenames (dict): Dictionary with keys from the prefixes, to a list of output filenames. + out_handles_1 (dict): Key (matching filenames) to out_handle (if opened) + out_handles_2 (dict): Key (matching filenames) to out_handle (if opened and paired reads) + """ - if tax_id == "A": - tax_id = 81077 - else: - tax_id = tax_id - return tax_id, read_id + out_handles_1 = {} + out_handles_2 = {} + filenames = defaultdict(list) + for key, prefix in prefixes.items(): + if fastq_2: + filenames[key].append(f"{prefix}_1.{filetype}") + filenames[key].append(f"{prefix}_2.{filetype}") + if get_handles: + if key in out_handles_1: + print("already open") + else: + out_handles_1[key] = open(f"{prefix}_1.{filetype}", "w") + out_handles_2[key] = open(f"{prefix}_2.{filetype}", "w") -def check_read_files(reads): - if reads[-3:] == ".gz": - read_file = gzip.open(reads, "rt") - zipped = True - else: - read_file = open(reads, "rt") - zipped = False - first = read_file.readline() - if len(first) == 0: - sys.stderr.write("ERROR: sequence file's first line is blank\n") - sys.exit(5) - if first[0] == ">": - filetype = "fasta" - elif first[0] == "@": - filetype = "fastq" + else: + filenames[key].append(f"{prefix}.{filetype}") + if get_handles: + if key in out_handles_1: + print("already open") + else: + out_handles_1[key] = open(f"{prefix}.{filetype}", "w") + return filenames, out_handles_1, out_handles_2 + +def close_outfiles(out_handles_1, out_handles_2): + """ + Checks each handle in the dictionary and closes if it is open. + + Args: + out_handles_1 (dict): Key to out_handle + out_handles_1 (dict): Key to out_handle (if paired reads) + """ + for handle in out_handles_1: + out_handles_1[handle].close() + for handle in out_handles_2: + out_handles_2[handle].close() + +def update_summary_with_record(taxon_id, record, out_counts, quals, lens): + """ + Updates the summary dictionaries for counts, qualities and lengths of reads from the new record Dictionaries indexed by taxon. + + Args: + taxon_id (str): Key for dictionaries + record (SeqRecord): A read parsed by Bio.SeqIO + out_counts (dict): Taxon ID to read count for that taxon. + quals (dict): Taxon ID to list of quality scores (each averaged over the read) for that taxon. + lens (dict): Taxon ID to list of read lengths for that taxon. + """ + name, seq, qual = record + out_counts[taxon_id] += 1 + quals[taxon_id].append(median([ord(x) - 33 for x in qual])) + lens[taxon_id].append(len(seq)) + +def add_record(taxon_id, record, out_counts, quals, lens, filenames, file_index, out_handles={}, out_records={}, buffer_record_size=None): + """ + Add the record to the required out file and update the summary statistics with it. If out_handles has open + out_handles, reads will be written directly to the handle. If not it will be stored in memory in the out_records. If + a buffer size is set and the out_records structure has at least this number of reads, the out_records for this taxon + ID will be appended to the relevant file and cleared. + + Args: + taxon_id (str): Key for dictionaries + record (SeqRecord): A read parsed by Bio.SeqIO + out_counts (dict): Taxon ID to read count for that taxon. + quals (dict): Taxon ID to list of quality scores (each averaged over the read) for that taxon. + lens (dict): Taxon ID to list of read lengths for that taxon. + filenames (dict): Taxon ID to list of output filenames. + file_index (int): 0 or 1, depending if processing forward or reverse read, index for the filenames list. + out_handles (dict): {} if handles closed, Taxon ID to open handle if open. + out_records (dict): {} if handles open, Taxon ID to list of records if not. + buffer_record_size (int): Number of records per taxon ID before they should be output to reduce memory use. + """ + + name, seq, qual = record + update_summary_with_record(taxon_id, record, out_counts, quals, lens) + + if out_handles != {} and taxon_id in out_handles: + out_handles[taxon_id].write(f"@{name}\n{seq}\n+\n{qual}\n") else: - sys.stderr.write("ERROR: sequence file must be FASTA or FASTQ\n") - sys.exit(5) - return filetype, zipped + if not buffer_record_size or len(out_records[taxon_id]) < buffer_record_size: + out_records[taxon_id].append(record) + else: + with open(filenames[taxon_id][file_index], "a") as f: + for existing_record in out_records[taxon_id]: + f.write(f"@{name}\n{seq}\n+\n{qual}\n") + del out_records[taxon_id] + out_records[taxon_id].append(record) + +def save_records_to_file(out_records, filenames, file_index): + """ + Save the records to file. + + Args: + out_records (dict): Taxon ID to list of records + filenames (dict): Taxon ID to list of output filenames + file_index (int): 0 or 1, depending if processing forward or reverse read, index for the filenames list. + """ + sys.stderr.write(f"Writing records for file {file_index+1}\n") + for taxon, records in out_records.items(): + with open(filenames[taxon][file_index], "a") as f: + for record in records: + name, seq, qual = record + f.write(f"@{name}\n{seq}\n+\n{qual}\n") + del out_records + +def file_iterator(read_file, read_map, subtaxa_map, inverse, out_counts, quals, lens, filenames, file_index, out_handles, low_memory=False): + """ + Iterate through the read_file file and add the read to the appropriate file or handle. + + Args: + read_file (str): Name of read file. + read_map (dict): Dictionary from read ID to list of taxon ID associated with it. + subtaxa_map (dict): Dictionary from taxon ID (output by read map) to list of taxon ID associated with + out files to be updated + inverse (bool): If True, exclude all reads which match the taxon IDs in the read_map. If False, select them. + out_counts (dict): Taxon ID to read count for that taxon. + quals (dict): Taxon ID to list of quality scores (each averaged over the read) for that taxon. + lens (dict): Taxon ID to list of read lengths for that taxon. + filenames (dict): Taxon ID to list of output filenames + file_index (int): 0 or 1, depending if processing forward or reverse read, index for the filenames list. + out_handles (dict): {} if handles closed, Taxon ID to open handle if open. + low_memory (bool): If True we expect to write directly to out handles, if False store records then write to file. + Returns: + int: Count of reads written to file. + """ + reads_of_interest = set(read_map.keys()) + count = 0 + out_records = None + if not low_memory: + out_records = defaultdict(list) + + sys.stderr.write(f"Reading in {read_file}\n") + for record in pyfastx.Fastq(read_file, build_index=False): + name, seq, qual = record + trimmed_name = trim_read_id(name) + if inverse: + if trimmed_name in reads_of_interest: + for taxon in read_map[trimmed_name]: + update_summary_with_record(taxon, record, out_counts, quals, lens) + continue + else: + count += 1 + add_record("other", record, out_counts, quals, lens, filenames, file_index, out_handles=out_handles, out_records=out_records) + + elif not inverse: + if trimmed_name not in reads_of_interest: + continue + for k2_taxon in read_map[trimmed_name]: + for taxon in subtaxa_map[k2_taxon]: + count += 1 + add_record(taxon, record, out_counts, quals, lens, filenames, file_index, out_handles=out_handles, out_records=out_records) + + if not low_memory: + save_records_to_file(out_records, filenames, file_index) + + return count + +def process_read_files( + prefixes: str, + filetype: str, + read_map: dict, + subtaxa_map: dict, + read_file_1: Path, + read_file_2: Path = None, + inverse: bool = False, + get_handles: bool = False +) -> tuple[dict, dict, dict, dict]: + """ + Iterate through (paired/unpaired) read_files and save the relevant reads, collecting summary statistics for these reads. + + Args: + prefixes (dict): A dictionary from the key (usually taxon_id) to the prefix of the output file for that key. + filetype (str): "fasta" or "fastq" depending on input filetype. + read_map (dict): Dictionary from read ID to list of taxon ID associated with it. + subtaxa_map (dict): Dictionary from taxon ID (output by read map) to list of taxon ID associated with + out files to be updated. + read_file_1 (Path): Name of read file. + read_file_2 (Path): Name of read file pair (if it exists). + inverse (bool): If True, exclude all reads which match the taxon IDs in the read_map. If False, select them. + get_handles (bool): If True we expect to write directly to out handles, if False store records then write to file. + + Returns: + out_counts (dict): Taxon ID to read count for that taxon. + quals (dict): Taxon ID to list of quality scores (each averaged over the read) for that taxon. + lens (dict): Taxon ID to list of read lengths for that taxon. + filenames (dict): Taxon ID to list of output filenames. + """ + out_counts = defaultdict(int) + quals = defaultdict(list) + lens = defaultdict(list) + + filenames, out_handles_1, out_handles_2 = setup_outfiles(read_file_2, prefixes, filetype, get_handles=get_handles) + + forward_count = file_iterator(read_file_1, read_map, subtaxa_map, inverse, out_counts, quals, lens, filenames, 0, out_handles_1, low_memory=get_handles) + + reverse_count = 0 + if read_file_2: + reverse_count = file_iterator(read_file_2, read_map, subtaxa_map, inverse, out_counts, quals, lens, filenames, 1, out_handles_2, low_memory=get_handles) + + if forward_count != reverse_count and (forward_count == 0 or reverse_count == 0): + sys.stderr.write( + "ERROR: No reads found for one of the file pair: extracted %i an %i reads respectively" % (forward_count, reverse_count) + ) + sys.exit(7) + close_outfiles(out_handles_1, out_handles_2) + return (out_counts, quals, lens, filenames) + + +def generate_summary(lists_to_extract, entries, prefix, out_counts, quals, lens, filenames, include_unclassified=False, short=False): + """ + Generate a summary JSON file, including information about each taxon ID and corresponding read statistics. + + Args: + lists_to_extract (list): A list of taxon ID to include in the summary. + entries (dict): A dictionary containing information about the name/rank associated with each taxon ID, either + inferred from the Kraken report or NCBI taxonomy. + prefix (str): Prefix for the summary file. + out_counts (dict): Taxon ID to read count for that taxon. + quals (dict): Taxon ID to list of quality scores (each averaged over the read) for that taxon. + lens (dict): Taxon ID to list of read lengths for that taxon. + filenames (dict): Taxon ID to list of output filenames. + include_unclassified (bool): True if each output file includes unclassified reads as well as those associated + with the taxon ID. + short (bool): Output short form summary, excluding taxon information. + filenames (dict): Taxon ID to list of output filenames. + """ + sys.stderr.write("Write summary\n") + summary = [] + for taxon_id in out_counts: + if out_counts[taxon_id] == 0: + sys.stderr.write(f"No reads extracted for taxon_id {taxon_id}\n") + continue + if short: + summary.append( + { + "filenames": filenames[taxon_id], + "qc_metrics": { + "num_reads": out_counts[taxon_id], + "avg_qual": mean(quals[taxon_id]), + "mean_len": mean(lens[taxon_id]), + }, + "includes_unclassified": include_unclassified + } + ) + else: + summary.append( + { + "human_readable": entries[taxon_id].name, + "taxon_id": taxon_id, + "tax_level": entries[taxon_id].rank, + "filenames": filenames[taxon_id], + "qc_metrics": { + "num_reads": out_counts[taxon_id], + "avg_qual": mean(quals[taxon_id]), + "mean_len": mean(lens[taxon_id]), + }, + "includes_unclassified": include_unclassified + } + ) -def parse_kraken_assignment_file(kraken_assignment_file, taxid_map, parent=None): - sys.stderr.write("Loading read assignments\n") - read_map = defaultdict(set) - with open(kraken_assignment_file, "r") as kfile: - for line in kfile: - taxid, read_id = parse_kraken_assignment_line(line) - if taxid in taxid_map: - read_map[read_id].update(taxid_map[taxid]) - elif parent: - # handle case where taxid has changed - current = taxid - while current in parent and current not in taxid_map and current != "1": - current = parent[current] - if current in taxid_map: - print(f"Add {taxid} to {current} list") - read_map[read_id].update(taxid_map[current]) - return read_map - - -def trim_read_id(read_id): - if read_id.endswith("/1") or read_id.endswith("/2"): - read_id = read_id[:-2] - elif read_id.endswith(".1") or read_id.endswith(".2"): - read_id = read_id[:-2] - - return read_id + with open(f"{prefix}_summary.json", "w") as f: + json.dump(summary, f) diff --git a/bin/handle_spike_ins.py b/bin/handle_spike_ins.py index 2650a26..c66c67d 100755 --- a/bin/handle_spike_ins.py +++ b/bin/handle_spike_ins.py @@ -360,8 +360,6 @@ def main(): spike_kraken_entries = parse_report_file(args.report_file, spike_taxids, args.save_json) - if args.illumina: - preset = "sr" if len(spike_taxids) > 0 or len(spike_refs) > 0: map_counts, map_ids = identify_spike_map_counts(args.fastq_file, spike_refs, preset) diff --git a/bin/report.py b/bin/report.py new file mode 100644 index 0000000..05bf6da --- /dev/null +++ b/bin/report.py @@ -0,0 +1,395 @@ +from collections import defaultdict +import csv +import sys + + +class KrakenEntry: + """ + A class representing a line in a kraken report. + + Attributes: + taxon_id (str): The NCBI taxon identifier. + name (string): The scientific name associated with this taxon. + rank (str): A letter coding the rank of this taxon. + depth (int): The number of indentations this entry had in the kraken file (related to hierarchy). + count (int): The count of reads assigned to this taxon and its descendants. + ucount (int): The count of reads assigned specifically to this taxon. + domain (str): The domain this taxon is a member of. + parent (str): The taxon id associated with the taxonomic parent. + children (set): A set of taxon ids associated with the direct taxonomic children. + sibling_rank (int): An integer representing the ranking among direct siblings (share the parent) based on count. + hierarchy (list): An ordered list of taxon ids representing the parents to taxonomic root. + """ + def __init__(self, row=None, domain=None, hierarchy=[]): + """ + Initializes an KrakenEntry object. + + Parameters: + row (str): A row from a kraken file + domain (str): The taxonomic domain this entry is associated with. + hierarchy (list): A list of taxon_ids tracing the ancestry from this taxon to the root. + """ + self.taxon_id = "0" + self.name = "unclassified" + self.rank = "U" + self.depth = 0 + self.count = 0 # inclusive count + self.ucount = 0 # unique_count + self.domain = domain + self.parent = None + self.children = set() + self.sibling_rank = 0 + self.hierarchy = hierarchy + if row is not None: + self.add_row(row) + + def print(self): + """ + Print the attributes of KrakenEntry as a string + """ + print( + f"{self.taxon_id},{self.name},{self.rank},{self.depth},{self.count},{self.ucount},{self.domain},{self.parent},{self.children},{self.sibling_rank},{self.hierarchy}") + + def parse_depth(self, name): + """ + Parse the number of indentations from the kraken name string + + Args: + name (str): The "Scientific Name" column from a kraken report line + + Returns: + depth (int): The relative depth within the name string. + """ + parse_name = name.split(" ") + depth = 0 + for i in parse_name: + if i != "": + break + depth += 1 + depth = int(depth / 2) + return depth + + def add_row(self, row): + """ + Parse information from kraken report row to update this entry. + + Args: + row (str): A line from a kraken report. + """ + self.taxon_id = row["Taxonomy ID"] + self.name = row["Scientific Name"].strip() + self.depth = self.parse_depth(row["Scientific Name"]) + self.rank = row["Rank"] + self.count = int(row["Clades"]) # inclusive count + self.ucount = int(row["Taxonomies"]) # unique_count + if self.count < self.ucount: + self.count, self.ucount = self.ucount, self.count + self.hierarchy = self.hierarchy[:self.depth] + + def add_parent(self, parent): + self.parent = parent + + def add_child(self, child): + self.children.add(child) + + def set_sibling_rank(self, rank): + self.sibling_rank = rank + + +class KrakenReport: + """ + A class representing a kraken report. + + Attributes: + entries (dict): A dict with keys for taxon ids and values for KrakenEntry. + total (int): Total number of reads in report. + unclassified (int): Number of unclassified reads. + classified (int): Number of classified reads. + domains (int): A dict with keys for names of domains and values for associated taxon id. + """ + def __init__(self, file_name=None): + """ + Initializes an KrakenReport object. + + Parameters: + file_name (Path): Name of kraken report file. + """ + self.entries = defaultdict(KrakenEntry) + self.total = 0 + self.unclassified = 0 + self.classified = 0 + self.domains = defaultdict(int) + if file_name: + self.load_df(file_name) + self.unclassified = self.entries[0].count + self.classified = self.entries[1].count + self.total = self.classified + self.unclassified + + def print(self): + """ + Print the attributes of KrakenEntry as a string + """ + print(f"Report has {len(self.entries)} taxon entries corresponding to {self.classified} classified and {self.unclassified} unclassified reads.") + + def add_parent_child(self, parent_id, child_id): + """ + Add parent-child relationship to each KrakenEntry. + + Parameters: + parent_id (str): taxon_id of parent. + child_id (str): taxon_id of child. + """ + + self.entries[child_id].add_parent(parent_id) + self.entries[parent_id].add_child(child_id) + + def set_sibling_ranks(self): + """ + Rank siblings (share common parent) based on the number of classified reads (count including descendants). Lower + rank means higher read count. Rank starts at 1, 2, 3, ... + + Parameters: + parent_id (str): taxon_id of parent. + child_id (str): taxon_id of child. + """ + for entry_id, entry in self.entries.items(): + if entry.sibling_rank > 0 or entry.parent in [None, 1, 131567]: + continue + if entry.rank in ["D", "R", "R1"]: + entry.set_sibling_rank(1) + elif len(self.entries[entry.parent].children) == 1: + entry.set_sibling_rank(1) + else: + sibling_dict = {i: self.entries[i].count for i in self.entries[entry.parent].children} + sorted_counts = sorted(sibling_dict.values(), reverse=True) + for i, c in sibling_dict.items(): + rank = sorted_counts.index(c) + 1 + self.entries[i].set_sibling_rank(rank) + + def check_sibling_ranks(self): + """ + Check every entry has been set a sibling_rank. 0 means not set. + """ + for entry_id in self.entries: + if self.entries[entry_id].sibling_rank == 0: + print(entry_id) + + def check_report(self, file_name): + """ + Check every line in the kraken report has the appropriate number of tab separated fields + + Parameters: + file_name (Path): Name of kraken report file + + Returns: + report_has_header (bool): True if report includes the header line + num_fields (int) number of fields in a line [6,8] + """ + with open(file_name, 'r') as handle: + line = handle.readline() + num_fields = len(line.split("\t")) + if num_fields not in [6,8]: + sys.stderr.write( + f"Kraken report file {file_name} badly formatted - must have 6 or 8 columns" + ) + sys.exit(9) + if line.startswith("%"): + return True, num_fields + else: + return False, num_fields + + def load_df(self, file_name): + """ + Check every line in the kraken report has the appropriate number of tab separated fields + + Parameters: + file_name (Path): Name of kraken report file + + Returns: + report_has_header (bool): True if report includes the header line + num_fields (int) number of fields in a line [6,8] + """ + csvfile = open(file_name, newline='') + df = {} + report_has_header, num_fields = self.check_report((file_name)) + if report_has_header: + df = csv.DictReader(csvfile, delimiter="\t") + elif num_fields == 6: + df = csv.DictReader(csvfile, delimiter="\t", fieldnames=["% of Seqs","Clades","Taxonomies","Rank","Taxonomy ID","Scientific Name"]) + elif num_fields == 8: + df = csv.DictReader(csvfile, delimiter="\t", fieldnames=["% of Seqs","Clades","Taxonomies","Read Minimizers", "Taxon Minimizers", "Rank","Taxonomy ID","Scientific Name"]) + + hierarchy = [] + domain = None + for row in df: + try: + if row["Rank"] == "D": + domain = row["Scientific Name"].strip() + self.domains[domain] = row["Taxonomy ID"] + entry = KrakenEntry(row=row, domain=domain, hierarchy=hierarchy) + + except: + print(f"Found badly formatted row:\n{row}") + print(f"Quitting load of {file_name}") + break + + self.entries[entry.taxon_id] = entry + hierarchy = entry.hierarchy.copy() + if len(hierarchy) > 0: + self.add_parent_child(hierarchy[-1], entry.taxon_id) + if entry.taxon_id != "0": + hierarchy.append(entry.taxon_id) + csvfile.close() + self.set_sibling_ranks() + # self.check_sibling_ranks() + + def get_domains(self): + """ + Get a list of taxonomic domains found in the report + + Returns: + list: List of domains + """ + domains = [] + for entry_id, entry in self.entries.items(): + if entry.rank == "D": + domains.append(entry) + entry.print() + return domains + + def get_tips(self): + """ + Get a list of terminating taxa (ie have no children) + + Returns: + list: List of KrakenEntry + """ + tips = [] + for entry_id, entry in self.entries.items(): + if len(entry.children) == 0: + tips.append(entry) + entry.print() + return tips + + def get_rank_entries(self, rank): + """ + Get a list of taxa at a given rank + + Parameters: + rank (str): a taxonomic rank + Returns: + list: List of KrakenEntry + """ + subset = [] + for entry_id, entry in self.entries.items(): + if entry.rank == rank: + subset.append(entry) + entry.print() + return subset + + def get_percentage(self, taxon_id, domain=None): + """ + For a taxon_id, what percentage of the dataset counts (or the domain counts) is it? + + Parameters: + taxon_id (str): a taxon ID + domain (str): Optional name of a domain + Returns: + float: Percentage of the dataset corresponding to counts of this taxon and descendants + """ + if domain and self.entries[taxon_id].domain != domain: + return 0.0 + + denominator = self.classified + if domain: + denominator = self.entries[self.domains[domain]].count + + count = self.entries[taxon_id].count + return float(count) / denominator + + def to_source_target_df(self, max_rank=None, domain=None): + """ + Convert this KrakenReport to a CSV with "source", "target", "value", "percentage" columns. If max_rank given, + the result is filtered to including only this number of top-ranking children for each parent. If domain is + given, includes only those as part of this domain, and scales percentage accordingly. Output file written to + "source_target.csv". + + Parameters: + max_rank (str): (optional) maximum rank among siblings to be included + domain (str): (optional) name of a domain + Returns: + list: a list of row-entry dictionaries + """ + records = [] + ignore = set() + skip = set() + for entry_id, entry in self.entries.items(): + + # we don't want the connections to cellular organisms, root etc + if entry.sibling_rank == 0: + continue + + # filter by domain where required + if domain and entry.domain != domain: + continue + + # filter by rank when specified + if max_rank: + if entry.sibling_rank > max_rank: + ignore.add(entry_id) + continue + elif entry.parent in ignore: + ignore.add(entry_id) + continue + + # filter if an intermediate rank + if entry.rank not in ["K", "D", "D1", "D2", "P", "C", "O", "F", "G", "S", "S1", "S2"]: + skip.add(entry_id) + continue + + index = 1 + while index < len(entry.hierarchy) and entry.hierarchy[-index] in skip: + index += 1 + source_id = entry.hierarchy[-index] + records.append({"source": self.entries[source_id].name, "target": entry.name, "value": entry.count, + "percentage": self.get_percentage(entry_id, domain=domain)}) + + with open("source_target.csv", 'w', newline='') as csvfile: + fieldnames = ["source", "target", "value", "percentage"] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for row in records: + writer.writerow(row) + + print(len(records), len(ignore), len(skip)) + + return records + + def to_df(self, sample_id="sample_id", ranks=[]): + """ + Create a pandas dataframe from KrakenReport. If ranks are specified, include only entries at those ranks. + + Parameters: + sample_id (str): (optional) A name to give this report in the dataframe. + ranks (list): (optional) A list of strings corresponding to ranks to include. + """ + if not ranks or len(ranks) == 0: + taxon_ids = [e for e in self.entries.keys()] + else: + taxon_ids = [e for e in self.entries.keys() if self.entries[e].rank in ranks] + return pd.DataFrame({sample_id: [self.entries[e].count for e in taxon_ids]}, index=taxon_ids) + + def check_host(self, host_dict): + """ + Check if any host taxon_id exceeds the max number of reads permitted. + + Parameters: + host_dict (dict): A dictionary with key a taxon_id, value a max number of reads allowed. + """ + + for host_id, max_host_count in host_dict.items(): + if host_id in self.entries and self.entries[host_id].count > max_host_count: + sys.stderr.write( + f"ERROR: found {self.entries[host_id].count} reads corresponding to host {self.entries[host_id].name} with taxon_id {host_id}, max allowed is {max_host_count}\n" + ) + sys.exit(2) \ No newline at end of file diff --git a/bin/taxonomy.py b/bin/taxonomy.py new file mode 100644 index 0000000..a01df88 --- /dev/null +++ b/bin/taxonomy.py @@ -0,0 +1,176 @@ +import os +import sys +from collections import defaultdict + +class TaxonEntry: + """ + A class representing a line in a kraken report. + + Attributes: + taxon_id (str): The NCBI taxon identifier. + name (string): The scientific name associated with this taxon. + rank (str): A letter coding the rank of this taxon. + """ + def __init__(self, taxon_id="0", name="unclassified", rank="U"): + """ + Initializes an TaxonEntry object. + + Parameters: + taxon_id (str): The NCBI taxon identifier. + name (str): The scientific name associated with this taxon. + rank (str): A letter coding the rank of this taxon. + """ + self.taxon_id = taxon_id + self.name = name + self.rank = rank + + def print(self): + """ + Print the attributes of TaxonEntry as a string + """ + print( + f"{self.taxon_id},{self.name},{self.rank}") + + +class Taxonomy: + """ + A class representing taxonomic information. + + Attributes: + parents (dict): A dict with keys for taxon_ids and values for parent taxon id. + children (dict): A dict with keys for taxon_ids and values for sets of direct child taxon id. + entries (dict): A dict with keys for taxon_ids and values for TaxonEntry representing that taxon. + """ + def __init__(self, taxonomy_dir=None, taxon_ids=None): + self.parents = defaultdict(str) + self.children = defaultdict(set) + self.entries = defaultdict(TaxonEntry) + + if taxonomy_dir: + self.load_parents_and_children(taxonomy_dir) + if taxonomy_dir and taxon_ids: + self.load_entries(taxonomy_dir, taxon_ids) + + def load_parents_and_children(self, taxonomy_dir): + """ + Loads the parent child relationships from the "nodes.dmp" file in the taxonomy directory. + Updates the class members `parents` and `children` + + Parameters: + taxonomy_dir (str): The unzipped directory downloaded from NCBI taxonomy. + """ + taxonomy = os.path.join(taxonomy_dir, "nodes.dmp") + try: + with open(taxonomy, "r") as f: + for line in f: + fields = line.split("\t|\t") + taxon_id, parent_taxon_id = fields[0], fields[1] + self.parents[taxon_id] = parent_taxon_id + self.children[parent_taxon_id].add(taxon_id) + except: + sys.stderr.write( + f"ERROR: Could not find taxonomy nodes.dmp file in {taxonomy_dir}" + ) + sys.exit(4) + + def load_entries_from_nodes(self, taxonomy_dir, taxon_ids): + """ + Updates taxon_id and rank information for specified taxon_ids from the "nodes.dmp" file in the taxonomy + directory to the entries structure. + + Parameters: + taxonomy_dir (str): The unzipped directory downloaded from NCBI taxonomy. + taxon_ids (list): List of taxon identifiers. + """ + if len(taxon_ids) == 0: + return + + taxonomy = os.path.join(taxonomy_dir, "nodes.dmp") + if not os.path.exists(taxonomy): + sys.stderr.write( + f"ERROR: Could not find taxonomy nodes.dmp file in {taxonomy_dir}" + ) + sys.exit(4) + try: + with open(taxonomy, "r") as f: + for line in f: + fields = line.split("\t|\t") + taxon_id, rank = fields[0], fields[2] + self.entries[taxon_id].taxon_id = taxon_id + self.entries[taxon_id].rank = rank + except: + sys.stderr.write( + f"ERROR: Badly formatted nodes.dmp file in {taxonomy}" + ) + sys.exit(4) + + def load_entries_from_names(self, taxonomy_dir, taxon_ids): + """ + Updates name information for specified taxon_ids from the "names.dmp" file in the taxonomy + directory to the entries structure. + + Parameters: + taxonomy_dir (str): The unzipped directory downloaded from NCBI taxonomy. + taxon_ids (list): List of taxon identifiers. + """ + if len(taxon_ids) == 0: + return + + taxonomy = os.path.join(taxonomy_dir, "names.dmp") + if not os.path.exists(taxonomy): + sys.stderr.write( + f"ERROR: Could not find taxonomy names.dmp file in {taxonomy_dir}" + ) + sys.exit(4) + try: + with open(taxonomy, "r") as f: + for line in f: + fields = [i.lstrip() for i in line.split("\t|")] + taxon_id, name, name_type = fields[0], fields[1], fields[3] + if taxon_id in taxon_ids and ("scientific name" in name_type or self.entries[taxon_id].name == "unclassified"): + self.entries[taxon_id].name = name + except: + sys.stderr.write( + f"ERROR: Badly formatted names.dmp file in {taxonomy}" + ) + sys.exit(4) + + def load_entries(self, taxonomy_dir, taxon_ids): + """ + Generates information for specified taxon_ids in the self.entries dictionary (taxon_id, name and rank). + + Parameters: + taxonomy_dir (str): The unzipped directory downloaded from NCBI taxonomy. + taxon_ids (list): List of taxon identifiers. + """ + self.load_entries_from_nodes(taxonomy_dir, taxon_ids) + self.load_entries_from_names(taxonomy_dir, taxon_ids) + + def get_taxon_id_map(self, taxon_ids=[], include_unclassified=False): + """ + Generates a map from a specified list of taxon_ids and their children to sets of taxon_ids + Any descendent of a specified taxon_id will include this taxon_id in it's set. + If include_unclassified is specified, there will be an entry from 0 to the input taxon_ids + + Parameters: + taxon_ids (list/set): An iterable of taxon_ids to consider. + include_unclassified (bool): Should the "unclassified" key be included with each taxon? + Returns: + dict: A map from specified list of taxon_ids and their children, to a subset of the original taxon_ids + ancestral to the key. + """ + taxon_id_map = defaultdict(set) + + for key in taxon_ids: + taxon_id_map[key].add(key) + if include_unclassified: + taxon_id_map["0"].update(taxon_ids) + + check = list(taxon_ids) + while len(check) > 0: + current = check.pop() + check.extend(self.children[current]) + for child in self.children[current]: + taxon_id_map[child].update(taxon_id_map[current]) + + return taxon_id_map \ No newline at end of file diff --git a/conf/base.config b/conf/base.config index 441c2d0..d811f9b 100644 --- a/conf/base.config +++ b/conf/base.config @@ -49,6 +49,9 @@ process { withLabel:process_long { time = { check_max( 20.h * task.attempt, 'time' ) } } + withLabel:process_more_memory { + memory = { check_max( 32.GB * task.attempt, 'memory' ) } + } withLabel:process_high_memory { memory = { check_max( 100.GB * task.attempt, 'memory' ) } } diff --git a/conf/params.config b/conf/params.config index 3cd5138..0e6a8dd 100644 --- a/conf/params.config +++ b/conf/params.config @@ -7,6 +7,8 @@ params { // Boilerplate options outdir = "output" + output = false + out_dir = false tracedir = "${params.outdir}/pipeline_info/${trace_timestamp}" publish_dir_mode = 'copy' email = null @@ -90,6 +92,7 @@ params { kraken_report = null bracken_report = null kraken_assignments = null + kraken_confidence = 0.05 run_bracken = false additional_bracken_jsons = null default_taxonomy = 'https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump.tar.gz' @@ -145,6 +148,7 @@ params { kraken_clients = 2 k2_port = 8080 k2_host = 'localhost' + kraken_confidence = 0.05 process_label = "scylla" monochrome_logs = false @@ -162,6 +166,6 @@ params { ] agent = null container = "biowilko/scylla" - container_version = "1.1.3" + container_version = "1.2.1" } -} \ No newline at end of file +} diff --git a/docs/errorcodes.txt b/docs/errorcodes.txt index 3f51015..092bf7e 100644 --- a/docs/errorcodes.txt +++ b/docs/errorcodes.txt @@ -6,4 +6,5 @@ 5 FASTA/Q file formatting error 6 Spike in not understood/not in reference JSON 7 No output files for one half of a read-pair -8 Mismatching paired read FASTQ headers \ No newline at end of file +8 Mismatching paired read FASTQ headers +9 Kraken report formatting error \ No newline at end of file diff --git a/environment.yml b/environment.yml index d0fddf4..980cafe 100644 --- a/environment.yml +++ b/environment.yml @@ -7,7 +7,7 @@ channels: dependencies: - python<=3.10 - pysam - - nextflow + - nextflow=23.10 - pip - kraken2 - kraken2-server # not on mac diff --git a/main.nf b/main.nf index 7eb79df..c56495a 100644 --- a/main.nf +++ b/main.nf @@ -3,6 +3,11 @@ include { classify_novel_viruses } from './modules/classify_novel_viruses' include { process_run } from './subworkflows/process_run' workflow { + if (params.output) + exit 1, "Please specify outdir with --outdir -- aborting" + if (params.out_dir) + exit 1, "Please specify outdir with --outdir -- aborting" + unique_id = "${params.unique_id}" if (unique_id == "null") { @@ -30,4 +35,4 @@ workflow { classify_novel_viruses(unique_id) else ingest(unique_id) -} \ No newline at end of file +} diff --git a/modules/check_hcid_status.nf b/modules/check_hcid_status.nf index 2ff4dcb..c0df3de 100644 --- a/modules/check_hcid_status.nf +++ b/modules/check_hcid_status.nf @@ -18,6 +18,12 @@ process check_hcid { tuple val(unique_id), path("*.warning.json"), emit: warnings, optional: true tuple val(unique_id), path("hcid.counts.csv"), emit: counts script: + preset = "" + if ( params.read_type == "illumina") { + preset = "--illumina" + } else if ( params.paired ) { + preset = "--illumina" + } """ check_hcid.py \ -k ${kreport} \ @@ -25,7 +31,7 @@ process check_hcid { -t ${taxonomy} \ -i ${hcid_defs} \ -d ${hcid_refs} \ - -p "hcid" + -p "hcid" ${preset} """ } diff --git a/modules/extract_all.nf b/modules/extract_all.nf index fb5a78f..d4aac0e 100644 --- a/modules/extract_all.nf +++ b/modules/extract_all.nf @@ -34,9 +34,10 @@ process split_kreport { process extract_taxa_paired_reads { label "process_single" - label "process_high_memory" + label "process_more_memory" - errorStrategy {task.exitStatus in 2..3 ? "ignore" : "terminate"} + errorStrategy {task.exitStatus in 2..3 ? "ignore" : "retry"} + maxRetries 3 conda "bioconda::pyfastx=2.01" container "biocontainers/pyfastx:2.0.1--py39h3d4b85c_0" @@ -76,9 +77,10 @@ process extract_taxa_paired_reads { process extract_taxa_reads { label "process_single" - label "process_high_memory" + label "process_more_memory" - errorStrategy {task.exitStatus in 2..3 ? "ignore" : "terminate"} + errorStrategy {task.exitStatus in 2..3 ? "ignore" : "retry"} + maxRetries 3 conda "bioconda::pyfastx=2.01" container "biocontainers/pyfastx:2.0.1--py39h3d4b85c_0" @@ -117,9 +119,10 @@ process extract_taxa_reads { process extract_paired_virus_and_unclassified { label "process_single" - label "process_high_memory" + label "process_more_memory" - errorStrategy {task.exitStatus in 2..3 ? "ignore" : "terminate"} + errorStrategy {task.exitStatus in 2..3 ? "ignore" : "retry"} + maxRetries 3 conda "bioconda::pyfastx=2.01" container "biocontainers/pyfastx:2.0.1--py39h3d4b85c_0" @@ -146,9 +149,10 @@ process extract_paired_virus_and_unclassified { process extract_virus_and_unclassified { label "process_single" - label "process_high_memory" + label "process_more_memory" - errorStrategy {task.exitStatus in 2..3 ? "ignore" : "terminate"} + errorStrategy {task.exitStatus in 2..3 ? "ignore" : "retry"} + maxRetries 3 conda "bioconda::pyfastx=2.01" container "biocontainers/pyfastx:2.0.1--py39h3d4b85c_0" @@ -175,9 +179,10 @@ process extract_virus_and_unclassified { process extract_paired_virus { label 'process_single' - label 'process_high_memory' + label 'process_more_memory' - errorStrategy {task.exitStatus in 2..3 ? 'ignore' : 'terminate'} + errorStrategy {task.exitStatus in 2..3 ? 'ignore' : 'retry'} + maxRetries 3 conda 'bioconda::biopython=1.78 bioconda::tabix=1.11' container "biocontainers/pyfastx:2.0.1--py39h3d4b85c_0" @@ -203,9 +208,10 @@ process extract_paired_virus { process extract_virus { label 'process_single' - label 'process_high_memory' + label 'process_more_memory' - errorStrategy {task.exitStatus in 2..3 ? 'ignore' : 'terminate'} + errorStrategy {task.exitStatus in 2..3 ? 'ignore' : 'retry'} + maxRetries 3 conda 'bioconda::biopython=1.78 bioconda::tabix=1.11' container "biocontainers/pyfastx:2.0.1--py39h3d4b85c_0" @@ -231,9 +237,10 @@ process extract_virus { process extract_paired_dehumanised { label "process_single" - label "process_high_memory" + label "process_more_memory" - errorStrategy {task.exitStatus in 2..3 ? "ignore" : "terminate"} + errorStrategy {task.exitStatus in 2..3 ? "ignore" : "retry"} + maxRetries 3 conda "bioconda::pyfastx=2.01" container "biocontainers/pyfastx:2.0.1--py39h3d4b85c_0" @@ -260,9 +267,10 @@ process extract_paired_dehumanised { process extract_dehumanised { label "process_single" - label "process_high_memory" + label "process_more_memory" - errorStrategy {task.exitStatus in 2..3 ? "ignore" : "terminate"} + errorStrategy {task.exitStatus in 2..3 ? "ignore" : "retry"} + maxRetries 3 conda "bioconda::pyfastx=2.01" container "biocontainers/pyfastx:2.0.1--py39h3d4b85c_0" diff --git a/modules/generate_report.nf b/modules/generate_report.nf index 36547ac..e1960c4 100644 --- a/modules/generate_report.nf +++ b/modules/generate_report.nf @@ -2,7 +2,10 @@ process make_report { label "process_single" - label "process_high_memory" + label "process_more_memory" + + errorStrategy "retry" + maxRetries 3 publishDir "${params.outdir}/${unique_id}/", mode: 'copy' diff --git a/modules/kraken_server.nf b/modules/kraken_server.nf index e436a15..8912758 100644 --- a/modules/kraken_server.nf +++ b/modules/kraken_server.nf @@ -99,7 +99,8 @@ process kraken_server { --max-requests ${kraken_compute + 1} --thread-pool ${task.cpus}\ --port ${params.k2_port} \ --host-ip ${params.k2_host} \ - --db ./${database}/ + --db ./${database}/ \ + --confidence ${params.kraken_confidence} """ } diff --git a/nextflow.config b/nextflow.config index a33d322..1b9a68b 100644 --- a/nextflow.config +++ b/nextflow.config @@ -32,7 +32,7 @@ manifest { description = 'Classify metagenomic sequence data from human respiratory infections.' mainScript = 'main.nf' nextflowVersion = '>=20.10.0' - version = 'v1.2.1' + version = 'v1.3.0' } profiles { @@ -64,6 +64,7 @@ profiles { docker.enabled = true docker.registry = 'quay.io' docker.userEmulation = true + docker.runOptions = "--user \$(id -u):\$(id -g) --group-add 100" conda.enabled = false singularity.enabled = false podman.enabled = false diff --git a/subworkflows/process_run.nf b/subworkflows/process_run.nf index ae1c090..e1ea5f7 100644 --- a/subworkflows/process_run.nf +++ b/subworkflows/process_run.nf @@ -48,10 +48,17 @@ process move_or_compress { workflow process_barcode { take: - barcode_ch + input_ch main: - classify_and_report(barcode_ch, barcode_ch, null) - extract_all(barcode_ch, classify_and_report.out.assignments, classify_and_report.out.kreport, classify_and_report.out.taxonomy) + move_or_compress(input_ch).set{ barcode_ch } + + if (params.paired) + input_ch.map{barcode_id, barcode_files -> [barcode_id, barcode_files[0], barcode_files[1]]}.set{ raw_ch } + else + raw_ch = barcode_ch + + classify_and_report(raw_ch, barcode_ch, null) + extract_all(raw_ch, classify_and_report.out.assignments, classify_and_report.out.kreport, classify_and_report.out.taxonomy) if ( params.classify_novel_viruses ){ classify_virus_fastq(extract_all.out.virus) } @@ -66,13 +73,15 @@ workflow process_run { get_params_and_versions(unique_id) run_dir = file("${params.run_dir}", type: "dir", checkIfExists:true) - barcode_input = Channel.fromPath("${run_dir}/*", type: "dir", checkIfExists:true, maxDepth:1).map { [it.baseName, get_fq_files_in_dir(it)]} - move_or_compress(barcode_input) - + if (params.paired) + barcode_input = Channel.fromFilePairs("${run_dir}/*_R{1,2}*.f*q*", type: "file", checkIfExists:true) + else + barcode_input = Channel.fromPath("${run_dir}/*", type: "dir", checkIfExists:true, maxDepth:1).map { [it.baseName, get_fq_files_in_dir(it)]} + barcode_input.view() if (params.raise_server) kraken_setup(params.raise_server) - process_barcode(move_or_compress.out) + process_barcode(barcode_input) process_barcode.out.report .map{ barcode_id, barcode_report -> "barcode,filepath,sample_report\n${barcode_id},${barcode_id}/classifications/${params.database_set}.kraken_report.txt,${barcode_id}/${barcode_id}_report.html\n" }.collectFile(name: "${params.outdir}/${unique_id}/samples.csv", sort:true, keepHeader:true, skip:1) diff --git a/test/test_data/illumina/barcode02.1.fq.gz b/test/test_data/illumina/barcode02.1.fq.gz deleted file mode 100644 index b8b3767..0000000 Binary files a/test/test_data/illumina/barcode02.1.fq.gz and /dev/null differ diff --git a/test/test_data/illumina/barcode02.2.fq.gz b/test/test_data/illumina/barcode02.2.fq.gz deleted file mode 100644 index 91e0b93..0000000 Binary files a/test/test_data/illumina/barcode02.2.fq.gz and /dev/null differ diff --git a/test/test_data/illumina/barcode02_R1.fq.gz b/test/test_data/illumina/barcode02_R1.fq.gz new file mode 100644 index 0000000..4d26b8b Binary files /dev/null and b/test/test_data/illumina/barcode02_R1.fq.gz differ diff --git a/test/test_data/illumina/barcode02_R2.fq.gz b/test/test_data/illumina/barcode02_R2.fq.gz new file mode 100644 index 0000000..078448d Binary files /dev/null and b/test/test_data/illumina/barcode02_R2.fq.gz differ