From 364a9dcb6918087d68a2173374c6cba879ed02d1 Mon Sep 17 00:00:00 2001 From: Matt Stone Date: Wed, 16 Oct 2024 20:44:43 -0400 Subject: [PATCH 1/5] fix: Check against `HN` tag instead of `is_unmapped` property (#69) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #68 `bwa aln` produces inconsistent behavior when recording the mapped status of multiply mapping reads. In most cases, such reads are flagged as mapped with a mapq of 0. However, we have discovered some cases where these reads are flagged as unmapped. `prymer` currently relies on the unmapped flag to identify reads with a hit count of zero. This has been yielding errors when attempting to parse reads in the latter category. This information can instead be obtained directly from the `HN` tag. --- NB: Thanks to detective work by @tfenne > This happens because internally bwa concatenates all the sequences in the FASTA file, so it can discover hits that extend across contig boundaries. Unfortunately when it discovers it’s done this on the mapping selected as the primary mapping, it just marks it as unmapped instead of removing the mapping entirely (and selecting a different primary mapping if there are secondary hits). This PR additionally discards any hits that extend beyond the end of the contig to which they are mapped. --------- Co-authored-by: Tim Fennell --- prymer/offtarget/bwa.py | 40 +++++++++++++++++++++++++------------ tests/offtarget/test_bwa.py | 18 ++++++++++++----- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/prymer/offtarget/bwa.py b/prymer/offtarget/bwa.py index 54f74ae..d84e7a8 100644 --- a/prymer/offtarget/bwa.py +++ b/prymer/offtarget/bwa.py @@ -165,8 +165,10 @@ def __str__(self) -> str: @dataclass(init=True, frozen=True) class BwaResult: - """Represents zero or more hits or alignments found by BWA for the given query. The number of - hits found may be more than the number of hits listed in the `hits` attribute. + """ + Represents zero or more hits or alignments found by BWA for the given query. + + The number of hits found may be more than the number of hits listed in the `hits` attribute. Attributes: query: the query (as given, no RC'ing) @@ -359,7 +361,8 @@ def map_all(self, queries: list[Query]) -> list[BwaResult]: return results def _to_result(self, query: Query, rec: pysam.AlignedSegment) -> BwaResult: - """Converts the query and alignment to a result. + """ + Convert the query and alignment to a result. Args: query: the original query @@ -370,17 +373,20 @@ def _to_result(self, query: Query, rec: pysam.AlignedSegment) -> BwaResult: "Query and Results are out of order" f"Query=${query.id}, Result=${rec.query_name}" ) - num_hits: Optional[int] = int(rec.get_tag("HN")) if rec.has_tag("HN") else None - if rec.is_unmapped: - if num_hits is not None and num_hits > 0: - raise ValueError(f"Read was unmapped but num_hits > 0: {rec}") - return BwaResult(query=query, hit_count=0, hits=[]) - elif num_hits > self.max_hits: - return BwaResult(query=query, hit_count=num_hits, hits=[]) - else: + num_hits: int = int(rec.get_tag("HN")) if rec.has_tag("HN") else 0 + # `to_hits()` removes artifactual hits which span the boundary between concatenated + # reference sequences. If we are reporting a list of hits, the number of hits should match + # the size of this list. Otherwise, we either have zero hits, or more than the maximum + # number of hits. In both of the latter cases, we have to rely on the count reported in the + # `HN` tag. + hits: list[BwaHit] + if 0 < num_hits <= self.max_hits: hits = self.to_hits(rec=rec) - hit_count = num_hits if len(hits) == 0 else len(hits) - return BwaResult(query=query, hit_count=hit_count, hits=hits) + num_hits = len(hits) + else: + hits = [] + + return BwaResult(query=query, hit_count=num_hits, hits=hits) def to_hits(self, rec: AlignedSegment) -> list[BwaHit]: """Extracts the hits from the given alignment. Beyond the current alignment @@ -425,4 +431,12 @@ def to_hits(self, rec: AlignedSegment) -> list[BwaHit]: if not self.include_alt_hits: hits = [hit for hit in hits if not hit.refname.endswith("_alt")] + # Remove hits that extend beyond the end of a contig - these are artifacts of `bwa aln`'s + # alignment process, which concatenates all reference sequences and reports hits which span + # across contigs. + # NB: the type ignore is necessary because pysam's type hint for `get_reference_length` is + # incorrect. + # https://github.com/pysam-developers/pysam/pull/1313 + hits = [hit for hit in hits if hit.end <= self.header.get_reference_length(hit.refname)] # type: ignore[arg-type] # noqa: E501 + return hits diff --git a/tests/offtarget/test_bwa.py b/tests/offtarget/test_bwa.py index b914364..6f0cea6 100644 --- a/tests/offtarget/test_bwa.py +++ b/tests/offtarget/test_bwa.py @@ -262,13 +262,21 @@ def test_to_result_out_of_order(ref_fasta: Path) -> None: def test_to_result_num_hits_on_unmapped(ref_fasta: Path) -> None: with BwaAlnInteractive(ref=ref_fasta, max_hits=1) as bwa: query = Query(bases="GATTACA", id="foo") - # Exception: HN cannot be non-zero + + # HN *can* be non-zero rec = SamBuilder().add_single(name=query.id, attrs={"HN": 42}) - with pytest.raises(ValueError, match="Read was unmapped but num_hits > 0"): - bwa._to_result(query=query, rec=rec) + result = bwa._to_result(query=query, rec=rec) + assert result.hit_count == 42 + assert result.hits == [] + # OK: HN tag is zero rec = SamBuilder().add_single(name=query.id, attrs={"HN": 0}) - bwa._to_result(query=query, rec=rec) + result = bwa._to_result(query=query, rec=rec) + assert result.hit_count == 0 + assert result.hits == [] + # OK: no HN tag rec = SamBuilder().add_single(name=query.id) - bwa._to_result(query=query, rec=rec) + result = bwa._to_result(query=query, rec=rec) + assert result.hit_count == 0 + assert result.hits == [] From 5d79a52064fbc331e77ed2c626eaf15a85c3d524 Mon Sep 17 00:00:00 2001 From: Clint Valentine Date: Thu, 17 Oct 2024 15:57:12 -0700 Subject: [PATCH 2/5] feat: redirect BwaAlnInteractive stderr to debug logging (#74) --- prymer/offtarget/bwa.py | 31 ++++++++++++++++++++++++++++++- prymer/util/executable_runner.py | 26 +++++++++++++++++++++++--- tests/offtarget/test_bwa.py | 20 ++++++++++++++++++++ 3 files changed, 73 insertions(+), 4 deletions(-) diff --git a/prymer/offtarget/bwa.py b/prymer/offtarget/bwa.py index d84e7a8..868db22 100644 --- a/prymer/offtarget/bwa.py +++ b/prymer/offtarget/bwa.py @@ -44,9 +44,12 @@ ``` """ # noqa: E501 +import logging import os +import subprocess from dataclasses import dataclass from pathlib import Path +from threading import Thread from typing import ClassVar from typing import Optional from typing import cast @@ -56,6 +59,7 @@ from fgpyo.sam import Cigar from pysam import AlignedSegment from pysam import AlignmentHeader +from typing_extensions import override from prymer.api import coordmath from prymer.util.executable_runner import ExecutableRunner @@ -298,7 +302,7 @@ def __init__( "/dev/stdin", ] - super().__init__(command=command) + super().__init__(command=command, stderr=subprocess.PIPE) header = [] for line in self._subprocess.stdout: @@ -309,6 +313,18 @@ def __init__( self.header = AlignmentHeader.from_text("".join(header)) + # NB: ExecutableRunner will default to redirecting stderr to /dev/null. However, we would + # like to preserve stderr messages from bwa for potential debugging. To do this, we create + # a single thread to continuously read from stderr and redirect text lines to a debug + # logger. The close() method of this class will additionally join the stderr logging thread. + self._logger = logging.getLogger(self.__class__.__qualname__) + self._stderr_thread = Thread( + daemon=True, + target=self._stream_to_sink, + args=(self._subprocess.stderr, self._logger.debug), + ) + self._stderr_thread.start() + def __signal_bwa(self) -> None: """Signals BWA to process the queries.""" self._subprocess.stdin.flush() @@ -317,6 +333,19 @@ def __signal_bwa(self) -> None: self._subprocess.stdin.write("\n" * 16) self._subprocess.stdin.flush() + @override + def close(self) -> bool: + """ + Gracefully terminates the underlying subprocess if it is still running. + + Returns: + True: if the subprocess was terminated successfully + False: if the subprocess failed to terminate or was not already running + """ + safely_closed: bool = super().close() + self._stderr_thread.join() + return safely_closed + def map_one(self, query: str, id: str = "unknown") -> BwaResult: """Maps a single query to the genome and returns the result. diff --git a/prymer/util/executable_runner.py b/prymer/util/executable_runner.py index e69500c..26da723 100644 --- a/prymer/util/executable_runner.py +++ b/prymer/util/executable_runner.py @@ -14,8 +14,10 @@ from contextlib import AbstractContextManager from pathlib import Path from types import TracebackType +from typing import Callable from typing import Optional from typing import Self +from typing import TextIO class ExecutableRunner(AbstractContextManager): @@ -30,6 +32,12 @@ class ExecutableRunner(AbstractContextManager): Subclasses of [`ExecutableRunner`][prymer.util.executable_runner.ExecutableRunner] provide additional type checking of inputs and orchestrate parsing output data from specific command-line tools. + + Warning: + Users of this class must be acutely aware of deadlocks that can exist when manually + writing and reading to subprocess pipes. The Python documentation for subprocess and PIPE + has warnings to this effect as well as recommended workarounds and alternatives. + https://docs.python.org/3/library/subprocess.html """ __slots__ = ("_command", "_subprocess", "_name") @@ -40,9 +48,13 @@ class ExecutableRunner(AbstractContextManager): def __init__( self, command: list[str], + # NB: users of this class must be acutely aware of deadlocks that can exist when manually + # writing and reading to subprocess pipes. The Python documentation for subprocess and PIPE + # has warnings to this effect as well as recommended workarounds and alternatives. + # https://docs.python.org/3/library/subprocess.html stdin: int = subprocess.PIPE, stdout: int = subprocess.PIPE, - stderr: int = subprocess.PIPE, + stderr: int = subprocess.DEVNULL, ) -> None: if len(command) == 0: raise ValueError(f"Invocation must not be empty, received {command}") @@ -71,6 +83,15 @@ def __exit__( super().__exit__(exc_type, exc_value, traceback) self.close() + @staticmethod + def _stream_to_sink(stream: TextIO, sink: Callable[[str], None]) -> None: + """Redirect a text IO stream to a text sink.""" + while True: + if line := stream.readline(): + sink(line.rstrip()) + else: + break + @classmethod def validate_executable_path(cls, executable: str | Path) -> Path: """Validates user-provided path to an executable. @@ -115,8 +136,7 @@ def is_alive(self) -> bool: def close(self) -> bool: """ - Gracefully terminates the underlying subprocess if it is still - running. + Gracefully terminates the underlying subprocess if it is still running. Returns: True: if the subprocess was terminated successfully diff --git a/tests/offtarget/test_bwa.py b/tests/offtarget/test_bwa.py index 6f0cea6..24be7ec 100644 --- a/tests/offtarget/test_bwa.py +++ b/tests/offtarget/test_bwa.py @@ -1,3 +1,4 @@ +import logging import shutil from dataclasses import replace from pathlib import Path @@ -99,6 +100,25 @@ def test_map_one_uniquely_mapped(ref_fasta: Path) -> None: assert result.query == query +def test_stderr_redirected_to_logger(ref_fasta: Path, caplog: pytest.LogCaptureFixture) -> None: + """Tests that we redirect the stderr of the bwa executable to a logger..""" + caplog.set_level(logging.DEBUG) + query = Query(bases="TCTACTAAAAATACAAAAAATTAGCTGGGCATGATGGCATGCACCTGTAATCCCGCTACT", id="NA") + with BwaAlnInteractive(ref=ref_fasta, max_hits=1) as bwa: + result = bwa.map_one(query=query.bases, id=query.id) + assert result.hit_count == 1 + assert result.hits[0].refname == "chr1" + assert result.hits[0].start == 61 + assert result.hits[0].negative is False + assert f"{result.hits[0].cigar}" == "60M" + assert result.query == query + assert "[bwa_aln_core] calculate SA coordinate..." in caplog.text + assert "[bwa_aln_core] convert to sequence coordinate..." in caplog.text + assert "[bwa_aln_core] refine gapped alignments..." in caplog.text + assert "[bwa_aln_core] print alignments..." in caplog.text + assert "[bwa_aln_core] 1 sequences have been processed" in caplog.text + + def test_map_one_unmapped(ref_fasta: Path) -> None: """Tests that bwa returns an unmapped alignment. The hit count should be zero and the list of hits empty.""" From 9f51325a6300279498d3e23820fd2a55037f10ee Mon Sep 17 00:00:00 2001 From: Erin McAuley Date: Fri, 8 Nov 2024 11:37:15 -0500 Subject: [PATCH 3/5] feat: add thermodynamic-related weights and parameters for `PickHybProbeOnly` task (#60) Adds thermodynamic-related weights and parameters to the `ProbeWeights` and `ProbeParameters`, `PrimerAndAmpliconWeights` and `PrimerAndAmpliconParameters` classes. --- prymer/api/oligo.py | 35 +++++++++++-- prymer/api/primer_pair.py | 3 +- prymer/primer3/primer3.py | 36 ++++++++++--- prymer/primer3/primer3_input.py | 6 +++ prymer/primer3/primer3_parameters.py | 65 ++++++++++++++++++++++++ prymer/primer3/primer3_weights.py | 44 +++++++++++++--- tests/primer3/test_primer3.py | 4 ++ tests/primer3/test_primer3_input.py | 6 +-- tests/primer3/test_primer3_parameters.py | 7 ++- tests/primer3/test_primer3_weights.py | 15 +++--- 10 files changed, 188 insertions(+), 33 deletions(-) diff --git a/prymer/api/oligo.py b/prymer/api/oligo.py index 4a003de..b8ea01c 100644 --- a/prymer/api/oligo.py +++ b/prymer/api/oligo.py @@ -83,24 +83,49 @@ class Oligo(OligoLike, Metric["Oligo"]): """Stores the properties of the designed oligo. - Oligos can include both single primer and internal probe designs. The penalty score of the - design is emitted by Primer3 and controlled by the corresponding design parameters. - The penalty for a primer is set by the combination of `PrimerAndAmpliconParameters` and - `PrimerWeights`, whereas a probe penalty is set by `ProbeParameters` and `ProbeWeights`. + Oligos can include both single primer and internal probe designs. Primer3 emits information + about the specific properties of a primer and/or probe, including the base sequence, + melting temperature (tm), and score. + + The penalty score of the design is emitted by Primer3 and controlled by the corresponding + design parameters. The penalty for a primer is set by the combination of + `PrimerAndAmpliconParameters` and `PrimerWeights`. + A probe penalty is set by `ProbeParameters` and `ProbeWeights`. + + The span of an `Oligo` object represents the mapping of the oligo + to the genome. `Oligo` objects can optionally have a `tail` sequence to prepend to the 5' end + of the primer or probe, as well as a `name` for downstream record keeping. + + `Oligo` objects can also store thermodynamic characteristics of primer and/or probe + design. These include the calculated melting temperatures of the most stable hairpin structure, + self-, and end- complementarity. These values can be emitted by Primer3. + + These thermodynamic characteristics are meant to quantify how likely it is the primer or probe + will bind to itself (e.g., instead of the target DNA). A melting temperature close to or greater + than the intended melting temperature for target binding indicates the primer or probe is + likely to form stable hairpins or dimers, leading to reduced efficiency of the reaction. Attributes: tm: the calculated melting temperature of the oligo penalty: the penalty or score for the oligo span: the mapping of the primer to the genome + name: an optional name to use for the primer + tm_homodimer: calculated melting temperature that represents + the tendency of an oligo to bind to itself (self-complementarity) + tm_3p_anchored_homodimer: calculated melting temperature that represents + the tendency of a primer to bind to the 3'-END of an identical primer + tm_secondary_structure: calculated melting temperature of the oligo hairpin structure bases: the base sequence of the oligo (excluding any tail) tail: an optional tail sequence to put on the 5' end of the primer - name: an optional name to use for the primer """ tm: float penalty: float span: Span + tm_homodimer: Optional[float] = None + tm_3p_anchored_homodimer: Optional[float] = None + tm_secondary_structure: Optional[float] = None bases: Optional[str] = None tail: Optional[str] = None diff --git a/prymer/api/primer_pair.py b/prymer/api/primer_pair.py index ccbff9e..78e5d40 100644 --- a/prymer/api/primer_pair.py +++ b/prymer/api/primer_pair.py @@ -37,9 +37,8 @@ class methods to represent a primer pair. The primer pair is comprised of a lef Span(refname='chr1', start=21, end=100, strand=) >>> list(primer_pair) -[Oligo(name=None, tm=70.0, penalty=-123.0, span=Span(refname='chr1', start=1, end=20, strand=), bases='GGGGGGGGGGGGGGGGGGGG', tail=None), Oligo(name=None, tm=70.0, penalty=-123.0, span=Span(refname='chr1', start=101, end=120, strand=), bases='TTTTTTTTTTTTTTTTTTTT', tail=None)] +[Oligo(name=None, tm=70.0, penalty=-123.0, span=Span(refname='chr1', start=1, end=20, strand=), tm_homodimer=None, tm_3p_anchored_homodimer=None, tm_secondary_structure=None, bases='GGGGGGGGGGGGGGGGGGGG', tail=None), Oligo(name=None, tm=70.0, penalty=-123.0, span=Span(refname='chr1', start=101, end=120, strand=), tm_homodimer=None, tm_3p_anchored_homodimer=None, tm_secondary_structure=None, bases='TTTTTTTTTTTTTTTTTTTT', tail=None)] -``` """ # noqa: E501 from dataclasses import dataclass diff --git a/prymer/primer3/primer3.py b/prymer/primer3/primer3.py index e1d62e4..97bddbc 100644 --- a/prymer/primer3/primer3.py +++ b/prymer/primer3/primer3.py @@ -386,10 +386,13 @@ def design(self, design_input: Primer3Input) -> Primer3Result: # noqa: C901 assert_never(unreachable) # pragma: no cover soft_masked, hard_masked = self.get_design_sequences(design_region) + # use 1-base coords, explain primer designs, use hard-masked sequence, and compute + # thermodynamic attributes global_primer3_params = { Primer3InputTag.PRIMER_FIRST_BASE_INDEX: 1, Primer3InputTag.PRIMER_EXPLAIN_FLAG: 1, Primer3InputTag.SEQUENCE_TEMPLATE: hard_masked, + Primer3InputTag.PRIMER_THERMODYNAMIC_OLIGO_ALIGNMENT: 1, } assembled_primer3_tags = { @@ -530,15 +533,32 @@ def _build_oligos( bases = unmasked_design_seq[slice_offset:slice_end] if span.strand == Strand.NEGATIVE: bases = reverse_complement(bases) - - primers.append( - Oligo( - bases=bases, - tm=float(design_results[f"PRIMER_{design_task.task_type}_{idx}_TM"]), - penalty=float(design_results[f"PRIMER_{design_task.task_type}_{idx}_PENALTY"]), - span=span, + # assemble Primer3-emitted results into `Oligo` objects + # if thermodynamic melting temperatures are missing, raise KeyError + try: + primers.append( + Oligo( + bases=bases, + tm=float(design_results[f"PRIMER_{design_task.task_type}_{idx}_TM"]), + penalty=float( + design_results[f"PRIMER_{design_task.task_type}_{idx}_PENALTY"] + ), + span=span, + tm_homodimer=float( + design_results[f"PRIMER_{design_task.task_type}_{idx}_SELF_ANY_TH"] + ), + tm_3p_anchored_homodimer=float( + design_results[f"PRIMER_{design_task.task_type}_{idx}_SELF_END_TH"], + ), + tm_secondary_structure=float( + design_results[f"PRIMER_{design_task.task_type}_{idx}_HAIRPIN_TH"] + ), + ) ) - ) + except KeyError as e: + raise KeyError( + f"Did not find a required field in Primer3-emitted results: {e}" + ) from e return primers @staticmethod diff --git a/prymer/primer3/primer3_input.py b/prymer/primer3/primer3_input.py index aaa29ca..b0e6712 100644 --- a/prymer/primer3/primer3_input.py +++ b/prymer/primer3/primer3_input.py @@ -72,6 +72,9 @@ PRIMER_MAX_NS_ACCEPTED -> 1 PRIMER_LOWERCASE_MASKING -> 1 PRIMER_NUM_RETURN -> 5 +PRIMER_MAX_SELF_ANY_TH -> 53.0 +PRIMER_MAX_SELF_END_TH -> 53.0 +PRIMER_MAX_HAIRPIN_TH -> 53.0 PRIMER_PAIR_WT_PRODUCT_SIZE_LT -> 1.0 PRIMER_PAIR_WT_PRODUCT_SIZE_GT -> 1.0 PRIMER_PAIR_WT_PRODUCT_TM_LT -> 0.0 @@ -85,6 +88,9 @@ PRIMER_WT_SIZE_GT -> 0.1 PRIMER_WT_TM_LT -> 1.0 PRIMER_WT_TM_GT -> 1.0 +PRIMER_WT_SELF_ANY_TH -> 0.0 +PRIMER_WT_SELF_END_TH -> 0.0 +PRIMER_WT_HAIRPIN_TH -> 0.0 """ from dataclasses import MISSING diff --git a/prymer/primer3/primer3_parameters.py b/prymer/primer3/primer3_parameters.py index 2d427fb..b1a1b6a 100644 --- a/prymer/primer3/primer3_parameters.py +++ b/prymer/primer3/primer3_parameters.py @@ -56,13 +56,18 @@ class stores user input for internal probe design and maps it to the correct Pri PRIMER_MAX_NS_ACCEPTED -> 1 PRIMER_LOWERCASE_MASKING -> 1 PRIMER_NUM_RETURN -> 5 +PRIMER_MAX_SELF_ANY_TH -> 53.0 +PRIMER_MAX_SELF_END_TH -> 53.0 +PRIMER_MAX_HAIRPIN_TH -> 53.0 ``` """ import warnings from dataclasses import dataclass +from dataclasses import fields from typing import Any +from typing import Optional from prymer.api.minoptmax import MinOptMax from prymer.primer3.primer3_input_tag import Primer3InputTag @@ -84,9 +89,21 @@ class PrimerAndAmpliconParameters: primer_max_dinuc_bases: the maximal number of bases in a dinucleotide run in a primer avoid_masked_bases: whether Primer3 should avoid designing primers in soft-masked regions number_primers_return: the number of primers to return + primer_max_homodimer_tm: the max melting temperature acceptable for self-complementarity + primer_max_3p_homodimer_tm: the max melting temperature acceptable for self-complementarity + anchored at the 3' end + primer_max_hairpin_tm: the max melting temperature acceptable for secondary structure Please see the Primer3 manual for additional details: https://primer3.org/manual.html#globalTags + Each of `primer_max_homodimer_tm`, `primer_max_3p_homodimer_tm`, and `primer_max_hairpin_tm` is + optional. If these attributes are not provided, the default value will be set to 10 degrees + lower than the minimal melting temperature specified for the primer. This matches the Primer3 + manual. + + If these values are provided, users should provide the absolute value of the + melting temperature threshold (i.e. when provided, values should be specified independent + of primer design.) """ amplicon_sizes: MinOptMax[int] @@ -100,6 +117,9 @@ class PrimerAndAmpliconParameters: primer_max_dinuc_bases: int = 6 avoid_masked_bases: bool = True number_primers_return: int = 5 + primer_max_homodimer_tm: Optional[float] = None + primer_max_3p_homodimer_tm: Optional[float] = None + primer_max_hairpin_tm: Optional[float] = None def __post_init__(self) -> None: if self.primer_max_dinuc_bases % 2 == 1: @@ -110,6 +130,16 @@ def __post_init__(self) -> None: raise TypeError("Amplicon sizes and primer sizes must be integers") if self.gc_clamp[0] > self.gc_clamp[1]: raise ValueError("Min primer GC-clamp must be <= max primer GC-clamp") + # if thermo attributes are not provided, default them to `self.primer_tms.min - 10.0` + default_thermo_max: float = self.primer_tms.min - 10.0 + thermo_max_fields = [ + "primer_max_homodimer_tm", + "primer_max_3p_homodimer_tm", + "primer_max_hairpin_tm", + ] + for field in fields(self): + if field.name in thermo_max_fields and getattr(self, field.name) is None: + object.__setattr__(self, field.name, default_thermo_max) def to_input_tags(self) -> dict[Primer3InputTag, Any]: """Converts input params to Primer3InputTag to feed directly into Primer3.""" @@ -136,6 +166,9 @@ def to_input_tags(self) -> dict[Primer3InputTag, Any]: Primer3InputTag.PRIMER_MAX_NS_ACCEPTED: self.primer_max_Ns, Primer3InputTag.PRIMER_LOWERCASE_MASKING: 1 if self.avoid_masked_bases else 0, Primer3InputTag.PRIMER_NUM_RETURN: self.number_primers_return, + Primer3InputTag.PRIMER_MAX_SELF_ANY_TH: self.primer_max_homodimer_tm, + Primer3InputTag.PRIMER_MAX_SELF_END_TH: self.primer_max_3p_homodimer_tm, + Primer3InputTag.PRIMER_MAX_HAIRPIN_TH: self.primer_max_hairpin_tm, } return mapped_dict @@ -180,12 +213,28 @@ class ProbeParameters: probe_max_dinuc_bases: the max number of bases in a dinucleotide run in a probe probe_max_polyX: the max homopolymer length acceptable within a probe probe_max_Ns: the max number of ambiguous bases acceptable within a probe + probe_max_homodimer_tm: the max melting temperature acceptable for self-complementarity + probe_max_3p_homodimer_tm: the max melting temperature acceptable for self-complementarity + anchored at the 3' end + probe_max_hairpin_tm: the max melting temperature acceptable for secondary structure The attributes that have default values specified take their default values from the Primer3 manual. Please see the Primer3 manual for additional details: https://primer3.org/manual.html#globalTags + While Primer3 supports alignment based and thermodynamic methods for simulating hybridizations, + prymer enforces the use of the thermodynamic approach. This is slightly more computationally + expensive but superior in their ability to limit problematic oligo self-complementarity + (e.g., primer-dimers, nonspecific binding of probes) + + If they are not provided, `probe_max_self_any_thermo`, `probe_max_self_end_thermo`, and + `probe_max_hairpin_thermo` will be set to default values as specified in the Primer3 manual. + The default value is 10 degrees lower than the minimal melting temperature specified for + probe design (i.e. when not provided, values are specified relative to the probe design). + If these values are provided, users should provide the absolute value of the + melting temperature threshold (i.e. when provided, values should be specified as independent + of probe design.) """ @@ -196,6 +245,9 @@ class ProbeParameters: probe_max_dinuc_bases: int = 4 probe_max_polyX: int = 5 probe_max_Ns: int = 0 + probe_max_homodimer_tm: Optional[float] = None + probe_max_3p_homodimer_tm: Optional[float] = None + probe_max_hairpin_tm: Optional[float] = None def __post_init__(self) -> None: if not isinstance(self.probe_sizes.min, int): @@ -204,6 +256,16 @@ def __post_init__(self) -> None: raise TypeError("Probe melting temperatures and GC content must be floats") if self.probe_max_dinuc_bases % 2 == 1: raise ValueError("Max threshold for dinucleotide bases must be an even number of bases") + # if thermo attributes are not provided, default them to `self.probe_tms.min - 10.0` + default_thermo_max: float = self.probe_tms.min - 10.0 + thermo_max_fields = [ + "probe_max_homodimer_tm", + "probe_max_3p_homodimer_tm", + "probe_max_hairpin_tm", + ] + for field in fields(self): + if field.name in thermo_max_fields and getattr(self, field.name) is None: + object.__setattr__(self, field.name, default_thermo_max) def to_input_tags(self) -> dict[Primer3InputTag, Any]: """Converts input params to Primer3InputTag to feed directly into Primer3.""" @@ -219,6 +281,9 @@ def to_input_tags(self) -> dict[Primer3InputTag, Any]: Primer3InputTag.PRIMER_INTERNAL_MAX_GC: self.probe_gcs.max, Primer3InputTag.PRIMER_INTERNAL_MAX_POLY_X: self.probe_max_polyX, Primer3InputTag.PRIMER_INTERNAL_MAX_NS_ACCEPTED: self.probe_max_Ns, + Primer3InputTag.PRIMER_INTERNAL_MAX_SELF_ANY_TH: self.probe_max_homodimer_tm, + Primer3InputTag.PRIMER_INTERNAL_MAX_SELF_END_TH: self.probe_max_3p_homodimer_tm, + Primer3InputTag.PRIMER_INTERNAL_MAX_HAIRPIN_TH: self.probe_max_hairpin_tm, } return mapped_dict diff --git a/prymer/primer3/primer3_weights.py b/prymer/primer3/primer3_weights.py index 4668090..d074726 100644 --- a/prymer/primer3/primer3_weights.py +++ b/prymer/primer3/primer3_weights.py @@ -18,9 +18,9 @@ Example: >>> PrimerAndAmpliconWeights() # default implementation -PrimerAndAmpliconWeights(product_size_lt=1.0, product_size_gt=1.0, product_tm_lt=0.0, product_tm_gt=0.0, primer_end_stability=0.25, primer_gc_lt=0.25, primer_gc_gt=0.25, primer_self_any=0.1, primer_self_end=0.1, primer_size_lt=0.5, primer_size_gt=0.1, primer_tm_lt=1.0, primer_tm_gt=1.0) +PrimerAndAmpliconWeights(product_size_lt=1.0, product_size_gt=1.0, product_tm_lt=0.0, product_tm_gt=0.0, primer_end_stability=0.25, primer_gc_lt=0.25, primer_gc_gt=0.25, primer_self_any=0.1, primer_self_end=0.1, primer_size_lt=0.5, primer_size_gt=0.1, primer_tm_lt=1.0, primer_tm_gt=1.0, primer_homodimer_wt=0.0, primer_3p_homodimer_wt=0.0, primer_secondary_structure_wt=0.0) >>> PrimerAndAmpliconWeights(product_size_lt=5.0) -PrimerAndAmpliconWeights(product_size_lt=5.0, product_size_gt=1.0, product_tm_lt=0.0, product_tm_gt=0.0, primer_end_stability=0.25, primer_gc_lt=0.25, primer_gc_gt=0.25, primer_self_any=0.1, primer_self_end=0.1, primer_size_lt=0.5, primer_size_gt=0.1, primer_tm_lt=1.0, primer_tm_gt=1.0) +PrimerAndAmpliconWeights(product_size_lt=5.0, product_size_gt=1.0, product_tm_lt=0.0, product_tm_gt=0.0, primer_end_stability=0.25, primer_gc_lt=0.25, primer_gc_gt=0.25, primer_self_any=0.1, primer_self_end=0.1, primer_size_lt=0.5, primer_size_gt=0.1, primer_tm_lt=1.0, primer_tm_gt=1.0, primer_homodimer_wt=0.0, primer_3p_homodimer_wt=0.0, primer_secondary_structure_wt=0.0) """ # noqa: E501 from dataclasses import dataclass @@ -38,7 +38,7 @@ class PrimerAndAmpliconWeights: Some of these settings depart from the default settings enumerated in the Primer3 manual. Please see the Primer3 manual for additional details: - https://primer3.org/manual.html#globalTags + https://primer3.org/manual.html#globalTags Attributes: product_size_lt: weight for products shorter than @@ -53,8 +53,15 @@ class PrimerAndAmpliconWeights: for the last five 3' bases of primer primer_gc_lt: penalty for primers with GC percent lower than `PrimerAndAmpliconParameters.primer_gcs.opt` - primer_gc_gt: penalty weight for primers with GC percent higher than + primer_gc_gt: weight for primers with GC percent higher than `PrimerAndAmpliconParameters.primer_gcs.opt` + primer_homodimer_wt: penalty for the individual primer self binding value as specified + in `PrimerAndAmpliconParameters.primer_max_homodimer_tm` + primer_3p_homodimer_wt: weight for the 3'-anchored primer self binding value as specified in + `PrimerAndAmpliconParameters.primer_max_3p_homodimer_tm` + primer_secondary_structure_wt: penalty weight for the primer hairpin structure melting + temperature as defined in `PrimerAndAmpliconParameters.PRIMER_MAX_HAIRPIN_TH` + """ product_size_lt: float = 1.0 @@ -70,6 +77,9 @@ class PrimerAndAmpliconWeights: primer_size_gt: float = 0.1 primer_tm_lt: float = 1.0 primer_tm_gt: float = 1.0 + primer_homodimer_wt: float = 0.0 + primer_3p_homodimer_wt: float = 0.0 + primer_secondary_structure_wt: float = 0.0 def to_input_tags(self) -> dict[Primer3InputTag, Any]: """Maps weights to Primer3InputTag to feed directly into Primer3.""" @@ -87,6 +97,9 @@ def to_input_tags(self) -> dict[Primer3InputTag, Any]: Primer3InputTag.PRIMER_WT_SIZE_GT: self.primer_size_gt, Primer3InputTag.PRIMER_WT_TM_LT: self.primer_tm_lt, Primer3InputTag.PRIMER_WT_TM_GT: self.primer_tm_gt, + Primer3InputTag.PRIMER_WT_SELF_ANY_TH: self.primer_homodimer_wt, + Primer3InputTag.PRIMER_WT_SELF_END_TH: self.primer_3p_homodimer_wt, + Primer3InputTag.PRIMER_WT_HAIRPIN_TH: self.primer_secondary_structure_wt, } return mapped_dict @@ -102,15 +115,27 @@ class ProbeWeights: probe_tm_gt: penalty for probes with a Tm greater than `ProbeParameters.probe_tms.opt` probe_gc_lt: penalty for probes with GC content lower than `ProbeParameters.probe_gcs.opt` probe_gc_gt: penalty for probes with GC content greater than `ProbeParameters.probe_gcs.opt` + probe_wt_self_any_th: penalty for probe self-complementarity as defined in + `ProbeParameters.probe_max_self_any_thermo` + probe_wt_self_end: penalty for probe 3' complementarity as defined in + `ProbeParameters.probe_max_self_end_thermo` + probe_wt_hairpin_th: penalty for the most stable primer hairpin structure value as defined + in `ProbeParameters.probe_max_hairpin_thermo` + + Each of these defaults are taken from the Primer3 manual. More details can be found here: + https://primer3.org/manual.html """ - probe_size_lt: float = 0.25 - probe_size_gt: float = 0.25 + probe_size_lt: float = 1.0 + probe_size_gt: float = 1.0 probe_tm_lt: float = 1.0 probe_tm_gt: float = 1.0 - probe_gc_lt: float = 0.5 - probe_gc_gt: float = 0.5 + probe_gc_lt: float = 0.0 + probe_gc_gt: float = 0.0 + probe_homodimer_wt: float = 0.0 + probe_3p_homodimer_wt: float = 0.0 + probe_secondary_structure_wt: float = 0.0 def to_input_tags(self) -> dict[Primer3InputTag, Any]: """Maps weights to Primer3InputTag to feed directly into Primer3.""" @@ -121,5 +146,8 @@ def to_input_tags(self) -> dict[Primer3InputTag, Any]: Primer3InputTag.PRIMER_INTERNAL_WT_TM_GT: self.probe_tm_gt, Primer3InputTag.PRIMER_INTERNAL_WT_GC_PERCENT_LT: self.probe_gc_lt, Primer3InputTag.PRIMER_INTERNAL_WT_GC_PERCENT_GT: self.probe_gc_gt, + Primer3InputTag.PRIMER_INTERNAL_WT_SELF_ANY_TH: self.probe_homodimer_wt, + Primer3InputTag.PRIMER_INTERNAL_WT_SELF_END_TH: self.probe_3p_homodimer_wt, + Primer3InputTag.PRIMER_INTERNAL_WT_HAIRPIN_TH: self.probe_secondary_structure_wt, } return mapped_dict diff --git a/tests/primer3/test_primer3.py b/tests/primer3/test_primer3.py index 091af50..c74cb13 100644 --- a/tests/primer3/test_primer3.py +++ b/tests/primer3/test_primer3.py @@ -175,6 +175,10 @@ def test_left_primer_valid_designs( primer_and_amplicon_params=single_primer_params, task=DesignLeftPrimersTask(), ) + expected_thermo_max = single_primer_params.primer_tms.min - 10 + assert design_input.primer_and_amplicon_params.primer_max_homodimer_tm == expected_thermo_max + assert design_input.primer_and_amplicon_params.primer_max_3p_homodimer_tm == expected_thermo_max + assert design_input.primer_and_amplicon_params.primer_max_hairpin_tm == expected_thermo_max with Primer3(genome_fasta=genome_ref) as designer: for _ in range(10): # run many times to ensure we can re-use primer3 diff --git a/tests/primer3/test_primer3_input.py b/tests/primer3/test_primer3_input.py index 54d6c8d..2a70919 100644 --- a/tests/primer3/test_primer3_input.py +++ b/tests/primer3/test_primer3_input.py @@ -68,7 +68,7 @@ def test_primer_design_only_valid( primer_and_amplicon_params=valid_primer_amplicon_params, ) mapped_dict = test_input.to_input_tags(design_region=test_design_region) - assert len(mapped_dict.keys()) == 38 + assert len(mapped_dict.keys()) == 44 @pytest.mark.parametrize( @@ -102,7 +102,7 @@ def test_probe_design_only_valid( mapped_dict = test_input.to_input_tags(design_region=test_design_region) assert mapped_dict[Primer3InputTag.PRIMER_PICK_INTERNAL_OLIGO] == 1 - assert len(mapped_dict.keys()) == 21 + assert len(mapped_dict.keys()) == 27 # test instantiation of default `ProbeWeights` when they are not provided altered_input = Primer3Input( @@ -113,7 +113,7 @@ def test_probe_design_only_valid( primer_and_amplicon_params=None, ) altered_mapped_dict = altered_input.to_input_tags(design_region=test_target) - assert altered_mapped_dict[Primer3InputTag.PRIMER_INTERNAL_WT_GC_PERCENT_GT] == 0.5 + assert altered_mapped_dict[Primer3InputTag.PRIMER_INTERNAL_WT_GC_PERCENT_GT] == 0.0 def test_probe_design_only_raises(valid_probe_weights: ProbeWeights) -> None: diff --git a/tests/primer3/test_primer3_parameters.py b/tests/primer3/test_primer3_parameters.py index efd4d24..62d4ece 100644 --- a/tests/primer3/test_primer3_parameters.py +++ b/tests/primer3/test_primer3_parameters.py @@ -43,7 +43,9 @@ def test_primer_amplicon_param_construction_valid( def test_probe_param_construction_valid( valid_probe_params: ProbeParameters, ) -> None: - """Test ProbeParameters class instantiation with valid input""" + """Test ProbeParameters class instantiation with valid input. Assert that the *_thermo fields + (which were not explicitly set) are set to `valid_probe_params.probe_tms.min` - 10.0.""" + expected_thermo_value: float = valid_probe_params.probe_tms.min - 10.0 assert valid_probe_params.probe_sizes.min == 18 assert valid_probe_params.probe_sizes.opt == 22 assert valid_probe_params.probe_sizes.max == 30 @@ -53,6 +55,9 @@ def test_probe_param_construction_valid( assert valid_probe_params.probe_gcs.min == 45.0 assert valid_probe_params.probe_gcs.opt == 55.0 assert valid_probe_params.probe_gcs.max == 60.0 + assert valid_probe_params.probe_max_homodimer_tm == expected_thermo_value + assert valid_probe_params.probe_max_3p_homodimer_tm == expected_thermo_value + assert valid_probe_params.probe_max_hairpin_tm == expected_thermo_value def test_primer_amplicon_param_construction_raises( diff --git a/tests/primer3/test_primer3_weights.py b/tests/primer3/test_primer3_weights.py index f1a7726..4acdd30 100644 --- a/tests/primer3/test_primer3_weights.py +++ b/tests/primer3/test_primer3_weights.py @@ -20,19 +20,22 @@ def test_primer_weights_valid() -> None: assert test_dict[Primer3InputTag.PRIMER_WT_SIZE_GT] == 0.1 assert test_dict[Primer3InputTag.PRIMER_WT_TM_LT] == 1.0 assert test_dict[Primer3InputTag.PRIMER_WT_TM_GT] == 1.0 - assert len((test_dict.values())) == 13 + assert len((test_dict.values())) == 16 def test_probe_weights_valid() -> None: test_weights = ProbeWeights() test_dict = test_weights.to_input_tags() - assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_SIZE_LT] == 0.25 - assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_SIZE_GT] == 0.25 + assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_SIZE_LT] == 1.0 + assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_SIZE_GT] == 1.0 assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_TM_LT] == 1.0 assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_TM_GT] == 1.0 - assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_GC_PERCENT_LT] == 0.5 - assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_GC_PERCENT_GT] == 0.5 - assert len((test_dict.values())) == 6 + assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_GC_PERCENT_LT] == 0.0 + assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_GC_PERCENT_GT] == 0.0 + assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_SELF_ANY_TH] == 0.0 + assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_SELF_END_TH] == 0.0 + assert test_dict[Primer3InputTag.PRIMER_INTERNAL_WT_HAIRPIN_TH] == 0.0 + assert len(test_dict) == 9 def test_primer_weights_to_input_tags() -> None: From e9ddc8df28f2728bb1664cb2e10a9fe2dca6aded Mon Sep 17 00:00:00 2001 From: Tim Fennell Date: Fri, 8 Nov 2024 09:57:17 -0700 Subject: [PATCH 4/5] Make sure the number_probes_returned is respected. (#79) --- prymer/primer3/primer3_parameters.py | 1 + tests/primer3/test_primer3_input.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/prymer/primer3/primer3_parameters.py b/prymer/primer3/primer3_parameters.py index b1a1b6a..543d807 100644 --- a/prymer/primer3/primer3_parameters.py +++ b/prymer/primer3/primer3_parameters.py @@ -284,6 +284,7 @@ def to_input_tags(self) -> dict[Primer3InputTag, Any]: Primer3InputTag.PRIMER_INTERNAL_MAX_SELF_ANY_TH: self.probe_max_homodimer_tm, Primer3InputTag.PRIMER_INTERNAL_MAX_SELF_END_TH: self.probe_max_3p_homodimer_tm, Primer3InputTag.PRIMER_INTERNAL_MAX_HAIRPIN_TH: self.probe_max_hairpin_tm, + Primer3InputTag.PRIMER_NUM_RETURN: self.number_probes_return, } return mapped_dict diff --git a/tests/primer3/test_primer3_input.py b/tests/primer3/test_primer3_input.py index 2a70919..84b3bbf 100644 --- a/tests/primer3/test_primer3_input.py +++ b/tests/primer3/test_primer3_input.py @@ -101,8 +101,9 @@ def test_probe_design_only_valid( ) mapped_dict = test_input.to_input_tags(design_region=test_design_region) assert mapped_dict[Primer3InputTag.PRIMER_PICK_INTERNAL_OLIGO] == 1 + assert Primer3InputTag.PRIMER_NUM_RETURN in mapped_dict - assert len(mapped_dict.keys()) == 27 + assert len(mapped_dict.keys()) == 28 # test instantiation of default `ProbeWeights` when they are not provided altered_input = Primer3Input( From 8cbc1a244438eb1d6d3329a58c9a2f87cfae7917 Mon Sep 17 00:00:00 2001 From: Tim Fennell Date: Mon, 11 Nov 2024 16:18:50 -0700 Subject: [PATCH 5/5] Reducing `picking.py` to be simpler, more focused, and more performant. (#75) Closes https://github.com/fulcrumgenomics/prymer/issues/38 --- docs/overview.md | 40 +- prymer/api/__init__.py | 6 - prymer/api/picking.py | 479 +++------------ tests/api/test_picking.py | 1170 +++++++++++-------------------------- 4 files changed, 426 insertions(+), 1269 deletions(-) diff --git a/docs/overview.md b/docs/overview.md index 42759d2..1977e58 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -47,39 +47,7 @@ using the [`OffTargetDetector`][prymer.offtarget.offtarget_detector.OffTargetDet The remaining primers may then be used with the [`build_primer_pairs()`][prymer.api.picking.build_primer_pairs] method to build primer pairs -from all combinations of the left and right primers. -The produced primer pairs are scored in a manner similar to Primer3 using the [`score()`][prymer.api.picking.score] method. -The [`FilterParams`][prymer.api.picking.FilteringParams] class is used to provide parameters for scoring. - -Next, the [`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] method is used to select up to -a maximum number of primer pairs. The primer pairs are selected in the order of lowest penalty (highest score). As -part of this process, each primer pair must: - -1. Have an amplicon in the desired size range (see [`is_acceptable_primer_pair`][prymer.api.picking.is_acceptable_primer_pair]). -2. Have an amplicon melting temperature in the desired range (see [`is_acceptable_primer_pair`][prymer.api.picking.is_acceptable_primer_pair]). -3. Not have too many off-targets (see [`OffTargetDetector.check_one()`][prymer.offtarget.offtarget_detector.OffTargetDetector.check_one]). -4. Not have primer pairs that overlap too much (see [`check_primer_overlap()`][prymer.api.picking.check_primer_overlap]). -5. Not form a dimer with a melting temperature above a specified threshold (see the [`max_dimer_tm` attribute in `FilterParams`][prymer.api.picking.FilteringParams]). - -Checking for dimers may be performed using the [`NtThermoAlign`][prymer.ntthal.NtThermoAlign] command line executable, -and can be passed to [`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] as follows: - -```python -from prymer.ntthal import NtThermoAlign -from prymer.api.picking import FilteringParams, pick_top_primer_pairs -params = FilteringParams(...) -dimer_checker = NtThermoAlign() -pick_top_primer_pairs( - is_dimer_tm_ok=lambda s1, s2: ( - dimer_checker.duplex_tm(s1=s1, s2=s2) <= params.max_dimer_tm - ), - ... -) - -``` - -For convenience, the [`build_and_pick_primer_pairs()`][prymer.api.picking.build_and_pick_primer_pairs] method combines -both the [`build_primer_pairs()`][prymer.api.picking.build_primer_pairs] and -[`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] methods in single invocation. - -The resulting primer pairs may be further examined. \ No newline at end of file +from all combinations of the left and right primers that are within acceptable amplicon size and tm ranges. +Optionally, if a `max_heterodimer_tm` is provided, primer pairs will be screened to ensure that the left and right +primers do not dimerize with a Tm greater than the one provided. +The produced primer pairs are scored in a manner similar to Primer3 using the [`score()`][prymer.api.picking.score] method. \ No newline at end of file diff --git a/prymer/api/__init__.py b/prymer/api/__init__.py index 9cc352d..4d937a2 100644 --- a/prymer/api/__init__.py +++ b/prymer/api/__init__.py @@ -3,10 +3,7 @@ from prymer.api.minoptmax import MinOptMax from prymer.api.oligo import Oligo from prymer.api.oligo_like import OligoLike -from prymer.api.picking import FilteringParams -from prymer.api.picking import build_and_pick_primer_pairs from prymer.api.picking import build_primer_pairs -from prymer.api.picking import pick_top_primer_pairs from prymer.api.primer_pair import PrimerPair from prymer.api.span import BedLikeCoords from prymer.api.span import Span @@ -23,10 +20,7 @@ "ClusteredIntervals", "cluster_intervals", "MinOptMax", - "FilteringParams", "build_primer_pairs", - "pick_top_primer_pairs", - "build_and_pick_primer_pairs", "OligoLike", "Oligo", "PrimerPair", diff --git a/prymer/api/picking.py b/prymer/api/picking.py index aa20ce9..e407f8b 100644 --- a/prymer/api/picking.py +++ b/prymer/api/picking.py @@ -1,51 +1,26 @@ """ # Methods and class for building and scoring primer pairs from a list of left and right primers. -Typically, the first step is to build primer pairs from a list of left and right primers for a +Typically, primer pairs are built from a list of left and right primers for a given target with the [`build_primer_pairs()`][prymer.api.picking.build_primer_pairs] method. The returned primer pairs are automatically scored (using the -[`score()`][prymer.api.picking.score] method), and returned sorted by the penalty -(increasing). - -Next, the list of primer pairs for a given target are filtered based on various criteria using the -[`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] method. Criteria -included but not limited to filtering for desired size and melting temperature ranges (see -[`is_acceptable_primer_pair()`][prymer.api.picking.is_acceptable_primer_pair]), not too -many off-targets, no self-dimers, and so on. A given maximum number of passing primer pairs is -returned. - -These two steps can be performed jointly using the -[`build_and_pick_primer_pairs()`][prymer.api.picking.build_and_pick_primer_pairs] method. +[`score()`][prymer.api.picking.score] method). ## Module contents Contains the following public classes and methods: -- [`FilteringParams`][prymer.api.picking.FilteringParams] -- Stores the parameters used - for filtering primers and primer pairs. - [`score()`][prymer.api.picking.score] -- Scores the amplicon amplified by a primer pair in a manner similar to Primer3. -- [`check_primer_overlap()`][prymer.api.picking.check_primer_overlap] -- Checks that both - (next) primers have at least `min_difference` bases that do not overlap the other (previous) - primers. -- [`is_acceptable_primer_pair()`][prymer.api.picking.is_acceptable_primer_pair] -- Checks - if a primer pair has the desired size range and melting temperature. - [`build_primer_pairs()`][prymer.api.picking.build_primer_pairs] -- Builds primer pairs from individual left and primers. -- [`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] -- Selects up to - the given number of primer pairs from the given list of primer pairs. -- [`build_and_pick_primer_pairs()`][prymer.api.picking.build_and_pick_primer_pairs] -- - Builds primer pairs from individual left and right primers and selects up to the given number of - primer pairs from the given list of primer pairs. """ -from dataclasses import dataclass -from typing import Callable -from typing import Iterable +from pathlib import Path +from typing import Iterator from typing import Optional -from fgpyo.collections import PeekableIterator from pysam import FastaFile from prymer.api.melting import calculate_long_seq_tm @@ -54,104 +29,17 @@ from prymer.api.primer_pair import PrimerPair from prymer.api.span import Span from prymer.ntthal import NtThermoAlign -from prymer.offtarget import OffTargetDetector - - -@dataclass(frozen=True, init=True, slots=True) -class FilteringParams: - """Stores the parameters used for filtering primers and primer pairs. - - Attributes: - amplicon_sizes: the min, optimal, and max amplicon size - amplicon_tms: the min, optimal, and max amplicon melting temperatures - product_size_lt: penalty weight for products shorter than the optimal amplicon size - product_size_gt: penalty weight for products longer than the optimal amplicon size - product_tm_lt: penalty weight for products with a Tm smaller the optimal amplicon Tm - product_tm_gt: penalty weight for products with a Tm larger the optimal amplicon Tm - max_dimer_tm: the max primer dimer melting temperature allowed when filtering primer pairs - for dimer formation. Any primer pair with a dimer melting temperature above this - threshold will be discarded. - max_primer_hits: the maximum number of hits an individual primer can have in the genome - before it is considered an invalid primer, and all primer pairs containing the primer - failed. - max_primer_pair_hits: the maximum number of amplicons a primer pair can make and be - considered passing. - three_prime_region_length: the number of bases at the 3' end of the primer in which the - parameter `max_mismatches_in_three_prime_region` is evaluated. - max_mismatches_in_three_prime_region: the maximum number of mismatches that are tolerated in - the three prime region of each primer defined by `three_prime_region_length`. - max_mismatches: the maximum number of mismatches that are tolerated in the full primer. - min_primer_to_target_distance: minimum distance allowed between the end of a primer and - start of the target -- aimed at centering the target in the amplicon. - read_length: sequencing depth and maximum distance allowed between the start of a primer and - the end of the target -- ensuring coverage during sequencing - dist_from_primer_end_weight: weight determining importance of the distance between the - primer and amplicon - target_not_covered_by_read_length_weight: weight determining importance of coverage of the - target by the sequencing depth - """ - - amplicon_sizes: MinOptMax[int] - amplicon_tms: MinOptMax[float] - product_size_lt: int - product_size_gt: int - product_tm_lt: float - product_tm_gt: float - max_dimer_tm: float = 55.0 - max_primer_hits: int = 500 - max_primer_pair_hits: int = 1 - three_prime_region_length: int = 1 - max_mismatches_in_three_prime_region: int = 2 - max_mismatches: int = 2 - min_primer_to_target_distance: int = 30 - read_length: int = 150 - dist_from_primer_end_weight: float = 100.0 - target_not_covered_by_read_length_weight: float = 100.0 - - -def _dist_penalty(start: int, end: int, params: FilteringParams) -> float: - """Returns a penalty that penalizes primers whose innermost base is closer than some - minimum distance from the target. The "distance" is zero if the innermost base overlaps the - target, one if the innermost base abuts the target, etc. - - Args: - start: the 1-based start position (inclusive) - end: the 1-based end position (inclusive) - params: the filtering parameters - """ - diff = end - start - if diff >= params.min_primer_to_target_distance: - return 0.0 - else: - return (params.min_primer_to_target_distance - diff) * params.dist_from_primer_end_weight - - -def _seq_penalty(start: int, end: int, params: FilteringParams) -> float: - """Returns a penalty that penalizes amplicons where the target cannot be fully sequenced - at the given read length starting from both ends of the amplicon. - - Args: - start: the start coordinate of the span to consider - end: the end coordinate of the span to consider - params: the filtering parameters - """ - - amplicon_length = end - start + 1 - if amplicon_length <= params.read_length: - return 0.0 - else: - return ( - amplicon_length - params.read_length - ) * params.target_not_covered_by_read_length_weight +from prymer.primer3 import PrimerAndAmpliconWeights def score( left_primer: Oligo, right_primer: Oligo, - target: Span, amplicon: Span, - amplicon_seq_or_tm: str | float, - params: FilteringParams, + amplicon_tm: float, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], + weights: PrimerAndAmpliconWeights, ) -> float: """Score the amplicon in a manner similar to Primer3 @@ -162,20 +50,15 @@ def score( scaled by the product size weight. Is zero if the optimal amplicon size is zero. 3. Amplicon Tm: The difference in melting temperature between the calculated and optimal, weighted by the product melting temperature. - 4. Inner primer distance: For primers whose innermost base is closer than some minimum distance - from the target, the difference between the two distances scale by the corresponding weight. - 5. Amplicon length: The difference between the amplicon length and provided read length scaled - by the corresponding weight. Is zero when the amplicon is at most the read length. Args: left_primer: the left primer right_primer: the right primer - target: the target mapping amplicon: the amplicon mapping - amplicon_seq_or_tm: either the melting temperature of the amplicon, or the amplicon sequence - from which the melting temperature of the amplicon will be calculated with - `calculate_long_seq_tm` - params: the filtering parameters + amplicon_tm: the melting temperature of the amplicon + amplicon_sizes: minimum, optimal, and maximum amplicon sizes (lengths) + amplicon_tms: minimum, optimal, and maximum amplicon Tms + weights: the set of penalty weights Returns: the penalty for the whole amplicon. @@ -186,285 +69,109 @@ def score( # product size weight. The product size weight is different depending on if the amplicon # size is greater or less than the optimal amplicons. size_penalty: float - if params.amplicon_sizes.opt == 0: + if amplicon_sizes.opt == 0: size_penalty = 0.0 - elif amplicon.length > params.amplicon_sizes.opt: - size_penalty = (amplicon.length - params.amplicon_sizes.opt) * params.product_size_gt + elif amplicon.length > amplicon_sizes.opt: + size_penalty = (amplicon.length - amplicon_sizes.opt) * weights.product_size_gt else: - size_penalty = (params.amplicon_sizes.opt - amplicon.length) * params.product_size_lt + size_penalty = (amplicon_sizes.opt - amplicon.length) * weights.product_size_lt # The penalty for the amplicon melting temperature. # The difference in melting temperature between the calculated and optimal is weighted by the # product melting temperature. - tm: float - if isinstance(amplicon_seq_or_tm, str): - tm = calculate_long_seq_tm(amplicon_seq_or_tm) - else: - tm = float(amplicon_seq_or_tm) + tm = amplicon_tm tm_penalty: float - if tm > params.amplicon_tms.opt: - tm_penalty = (tm - params.amplicon_tms.opt) * params.product_tm_gt + if tm > amplicon_tms.opt: + tm_penalty = (tm - amplicon_tms.opt) * weights.product_tm_gt else: - tm_penalty = (params.amplicon_tms.opt - tm) * params.product_tm_lt - - # Penalize primers whose innermost base is closer than some minimum distance from the target - left_dist_penalty: float = _dist_penalty( - start=left_primer.span.end, end=target.start, params=params - ) - right_dist_penalty: float = _dist_penalty( - start=target.end, end=right_primer.span.start, params=params - ) - - # Penalize amplicons where the target cannot be fully sequenced at the given read length - # starting from both ends of the amplicon. - left_seq_penalty: float = _seq_penalty( - start=left_primer.span.start, end=target.end, params=params - ) - right_seq_penalty: float = _seq_penalty( - start=target.start, end=right_primer.span.end, params=params - ) + tm_penalty = (amplicon_tms.opt - tm) * weights.product_tm_lt # Put it all together - return ( - left_primer.penalty - + right_primer.penalty - + size_penalty - + tm_penalty - + left_dist_penalty - + right_dist_penalty - + left_seq_penalty - + right_seq_penalty - ) - - -def check_primer_overlap( - prev_left: Span, - prev_right: Span, - next_left: Span, - next_right: Span, - min_difference: int, -) -> bool: - """Check that both (next) primers have at least `min_difference` bases that do not overlap the - other (previous) primers. - - Args: - prev_left: left primer mapping from the previous primer pair. - prev_right: right primer mapping from the previous primer pair - next_left: left primer mapping from the current primer pair being compared - next_right: right primer mapping from the current primer pair being compared - min_difference: the minimum number of bases that must differ between two primers - - Returns: - True if the primers are sufficiently different - """ - left = prev_left.length_of_overlap_with(next_left) + min_difference - right = prev_right.length_of_overlap_with(next_right) + min_difference - return ( - prev_left.length >= left - and next_left.length >= left - and prev_right.length >= right - and next_right.length >= right - ) - - -def is_acceptable_primer_pair(primer_pair: PrimerPair, params: FilteringParams) -> bool: - """Determine if a primer pair can be kept considering: - - 1. the primer pair is in the desired size range - 2. The primer pair has a melting temperature in the desired range - - Args: - primer_pair: the primer pair. - params: the parameters used for filtering - - Returns: - True if the primer pair passes initial checks and can be considered later on. - """ - return ( - params.amplicon_sizes.min <= primer_pair.amplicon.length <= params.amplicon_sizes.max - and params.amplicon_tms.min <= primer_pair.amplicon_tm <= params.amplicon_tms.max - ) + return left_primer.penalty + right_primer.penalty + size_penalty + tm_penalty def build_primer_pairs( - left_primers: Iterable[Oligo], - right_primers: Iterable[Oligo], + left_primers: list[Oligo], + right_primers: list[Oligo], target: Span, - params: FilteringParams, - fasta: FastaFile, -) -> list[PrimerPair]: + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], + max_heterodimer_tm: Optional[float], + weights: PrimerAndAmpliconWeights, + fasta_path: Path, +) -> Iterator[PrimerPair]: """Builds primer pairs from individual left and primers. - Args: - left_primers: the left primers - right_primers: the right primers - target: the genome mapping for the target - params: the parameters used for filtering - fasta: the FASTA file from which the amplicon sequence will be retrieved. - - Returns: - the list of primer pairs, sorted by penalty (increasing) - """ - # generate all the primer pairs - primer_pairs: list[PrimerPair] = [] - for left_primer in left_primers: - for right_primer in right_primers: - if left_primer.span.refname != right_primer.span.refname: - raise ValueError( - "Cannot create a primer pair from left and right primers on different" - f" references: left: '{left_primer.span.refname}'" - f" right: '{right_primer.span.refname}'" - ) - amplicon_mapping = Span( - refname=target.refname, start=left_primer.span.start, end=right_primer.span.end - ) - amplicon_bed = amplicon_mapping.get_bedlike_coords() # since fasta.fetch is 0-based - amplicon_sequence = fasta.fetch( - reference=target.refname, start=amplicon_bed.start, end=amplicon_bed.end - ) - amplicon_penalty = score( - left_primer=left_primer, - right_primer=right_primer, - target=target, - amplicon=amplicon_mapping, - amplicon_seq_or_tm=amplicon_sequence, - params=params, - ) - pp = PrimerPair( - left_primer=left_primer, - right_primer=right_primer, - amplicon_sequence=amplicon_sequence, - amplicon_tm=calculate_long_seq_tm(amplicon_sequence), - penalty=amplicon_penalty, - ) - primer_pairs.append(pp) - - # sort by smallest penalty - return sorted(primer_pairs, key=lambda pp: pp.penalty) - - -def pick_top_primer_pairs( - primer_pairs: list[PrimerPair], - num_primers: int, - min_difference: int, - params: FilteringParams, - offtarget_detector: OffTargetDetector, - is_dimer_tm_ok: Callable[[str, str], bool], -) -> list[PrimerPair]: - """Selects up to the given number of primer pairs from the given list of primer pairs. - - The primer pairs are selected in the order of lowest penalty. - - The primer pairs must: - 1. Have an amplicon in desired size range (see `is_acceptable_primer_pair`). - 2. Have an amplicon melting temperature in the desired range (see `is_acceptable_primer_pair`). - 3. Not have too many off-targets (see `OffTargetDetector#check_one()`). - 4. Not have primer pairs that overlap too much (see `check_primer_overlap`). - 5. Not form a dimer with a melting temperature above a specified threshold (see `max_dimer_tm`). - - The _order_ of these checks are important as some of them are expensive to compute. - - Args: - primer_pairs: the list of all primer pairs. - num_primers: the number of primer pairs to return for the target - min_difference: the minimum base difference between two primers that we will tolerate. - params: the parameters used for filtering - offtarget_detector: the off-target detector. - is_dimer_tm_ok: a function to use for checking for dimers. Should return true if the pair - passes (e.g. the dimer check) - - Returns: - Up to `num_primers` primer pairs - """ - selected: list[PrimerPair] = [] - pp_iter = PeekableIterator(primer_pairs) - last_pp: Optional[PrimerPair] = None - while len(selected) < num_primers and pp_iter.can_peek(): - pp = next(pp_iter) - - # Enforce that primers were ordered by penalty (larger value first) - if last_pp is not None and pp.penalty < last_pp.penalty: - raise ValueError("Primers must be given in order by penalty (increasing value).") - last_pp = pp - - # Check some basic properties - if not is_acceptable_primer_pair(primer_pair=pp, params=params): - continue - - # Check for off-targets - if not offtarget_detector.check_one(primer_pair=pp).passes: - continue - - # Check that both primers have at least `min_difference` bases that do not overlap the - # other primer. - okay = all( - check_primer_overlap( - prev.left_primer.span, - prev.right_primer.span, - pp.left_primer.span, - pp.right_primer.span, - min_difference=min_difference, - ) - for prev in selected - ) - if not okay: - continue - - # Check dimer Tm - if not is_dimer_tm_ok(pp.left_primer.bases, pp.right_primer.bases): - continue - - selected.append(pp) - - return selected - - -def build_and_pick_primer_pairs( - left_primers: Iterable[Oligo], - right_primers: Iterable[Oligo], - target: Span, - num_primers: int, - min_difference: int, - params: FilteringParams, - offtarget_detector: OffTargetDetector, - dimer_checker: NtThermoAlign, - fasta: FastaFile, -) -> list[PrimerPair]: - """Picks up to `num_primers` primer pairs. + Only primer pairs that meet the requirements for amplicon sizes and tms will be returned. + In addition, if `max_heterodimer_tm` is provided then primer pairs with a heterodimer tm + exceeding the maximum will also be discarded. Args: left_primers: the left primers right_primers: the right primers target: the genome mapping for the target - num_primers: the number of primer pairs to return for the target. - min_difference: the minimum base difference between two primers that we will tolerate. - params: the parameters used for filtering. - offtarget_detector: the off-target detector. - dimer_checker: the primer-dimer melting temperature checker. - fasta: the FASTA file from which the amplicon sequence will be retrieved. + amplicon_sizes: minimum, optimal, and maximum amplicon sizes (lengths) + amplicon_tms: minimum, optimal, and maximum amplicon Tms + max_heterodimer_tm: if supplied, heterodimer Tms will be calculated for primer pairs, + and those exceeding the maximum Tm will be discarded + weights: the set of penalty weights + fasta_path: the path to the FASTA file from which the amplicon sequence will be retrieved. Returns: - the list of primer pairs, sorted by penalty (increasing) + an iterator over all the valid primer pairs, unsorted """ - # build the list of primer pairs - primer_pairs = build_primer_pairs( - left_primers=left_primers, - right_primers=right_primers, - target=target, - params=params, - fasta=fasta, - ) + # Short circuit if we have no left primers or no right primers + if not any(left_primers) or not any(right_primers): + return + + if any(p.span.refname != target.refname for p in left_primers): + raise ValueError("Left primers exist on different reference than target.") + + if any(p.span.refname != target.refname for p in right_primers): + raise ValueError("Right primers exist on different reference than target.") + + # Grab the sequence we'll use to fill in the amplicon sequence + with FastaFile(f"{fasta_path}") as fasta: + region_start = min(p.span.start for p in left_primers) + region_end = max(p.span.end for p in right_primers) + bases = fasta.fetch(target.refname, region_start - 1, region_end) + + with NtThermoAlign() as ntthal: + # generate all the primer pairs that don't violate hard size and Tm constraints + for lp in left_primers: + for rp in right_primers: + if rp.span.end - lp.span.start + 1 > amplicon_sizes.max: + continue + + amp_mapping = Span(refname=target.refname, start=lp.span.start, end=rp.span.end) + amp_bases = bases[ + amp_mapping.start - region_start : amp_mapping.end - region_start + 1 + ] + amp_tm = calculate_long_seq_tm(amp_bases) + + if amp_tm < amplicon_tms.min or amp_tm > amplicon_tms.max: + continue + + if max_heterodimer_tm is not None: + if ntthal.duplex_tm(lp.bases, rp.bases) > max_heterodimer_tm: + continue + + penalty = score( + left_primer=lp, + right_primer=rp, + amplicon=amp_mapping, + amplicon_tm=amp_tm, + amplicon_sizes=amplicon_sizes, + amplicon_tms=amplicon_tms, + weights=weights, + ) - # select the primer pairs - selected: list[PrimerPair] = pick_top_primer_pairs( - primer_pairs=primer_pairs, - num_primers=num_primers, - min_difference=min_difference, - params=params, - offtarget_detector=offtarget_detector, - is_dimer_tm_ok=lambda s1, s2: ( - dimer_checker.duplex_tm(s1=s1, s2=s2) <= params.max_dimer_tm - ), - ) + pp = PrimerPair( + left_primer=lp, + right_primer=rp, + amplicon_sequence=amp_bases, + amplicon_tm=amp_tm, + penalty=penalty, + ) - return selected + yield pp diff --git a/tests/api/test_picking.py b/tests/api/test_picking.py index 98f2096..164a5ed 100644 --- a/tests/api/test_picking.py +++ b/tests/api/test_picking.py @@ -1,919 +1,407 @@ -from collections import Counter -from dataclasses import dataclass -from dataclasses import replace +import dataclasses +from math import floor from pathlib import Path -from typing import Any from typing import Optional -from typing import Tuple -import pysam import pytest +from fgpyo.fasta.builder import FastaBuilder from fgpyo.sequence import reverse_complement -from prymer.api import FilteringParams from prymer.api import MinOptMax from prymer.api import Oligo from prymer.api import PrimerPair from prymer.api import Span -from prymer.api import Strand -from prymer.api import build_and_pick_primer_pairs -from prymer.api import build_primer_pairs -from prymer.api import pick_top_primer_pairs +from prymer.api import picking from prymer.api.melting import calculate_long_seq_tm -from prymer.api.picking import _dist_penalty -from prymer.api.picking import _seq_penalty -from prymer.api.picking import check_primer_overlap -from prymer.api.picking import is_acceptable_primer_pair -from prymer.api.picking import score as picking_score -from prymer.ntthal import NtThermoAlign -from prymer.offtarget import OffTargetDetector -from prymer.offtarget.bwa import BWA_EXECUTABLE_NAME +from prymer.primer3 import PrimerAndAmpliconWeights @pytest.fixture -def filter_params() -> FilteringParams: - return FilteringParams( - amplicon_sizes=MinOptMax(100, 150, 200), - amplicon_tms=MinOptMax(50.0, 55.0, 60.0), - product_size_lt=1, - product_size_gt=1, - product_tm_lt=0.0, - product_tm_gt=0.0, - min_primer_to_target_distance=0, - read_length=100000, - ) +def amplicon_sizes() -> MinOptMax[int]: + return MinOptMax(100, 150, 200) -@pytest.mark.parametrize( - "start, end, min_primer_to_target_distance, dist_from_primer_end_weight, penalty", - [ - (10, 20, 10, 2.0, 0.0), # 10bp away, at min distance - (10, 19, 10, 2.0, 2.0), # 9bp away, below min distance - (10, 21, 10, 2.0, 0.0), # 11bp away, beyond min distance - (10, 22, 32, 3.0, 60.0), # 2bp away, one over min distance, larger weight - (10, 22, 32, 5.0, 100.0), # 2bp away, one over min distance, large weight - ], -) -def test_dist_penalty( - start: int, - end: int, - min_primer_to_target_distance: int, - dist_from_primer_end_weight: float, - penalty: float, - filter_params: FilteringParams, -) -> None: - params = replace( - filter_params, - min_primer_to_target_distance=min_primer_to_target_distance, - dist_from_primer_end_weight=dist_from_primer_end_weight, - ) - assert _dist_penalty(start=start, end=end, params=params) == penalty - - -@pytest.mark.parametrize( - "start, end, read_length, target_not_covered_by_read_length_weight, penalty", - [ - (10, 10, 1, 2.0, 0.0), # 1bp amplicon_length length, at read length - (10, 10, 2, 2.0, 0.0), # 1bp amplicon_length length, one shorter than read length - (10, 10, 0, 2.0, 2.0), # 1bp amplicon_length length, one longer than read length - (10, 100, 91, 2.0, 0.0), # 91bp amplicon length, at read length - (10, 100, 90, 2.0, 2.0), # 1bp amplicon_length length, one longer than read length - (10, 100, 91, 4.0, 0.0), # 1bp amplicon_length length, one below minimum, larger weight - (10, 100, 50, 5.0, 205.0), # 1bp amplicon_length length, far below minimum, large weight - ], -) -def test_seq_penalty( - start: int, - end: int, - read_length: int, - target_not_covered_by_read_length_weight: float, - penalty: float, - filter_params: FilteringParams, -) -> None: - params = replace( - filter_params, - read_length=read_length, - target_not_covered_by_read_length_weight=target_not_covered_by_read_length_weight, - ) - assert _seq_penalty(start=start, end=end, params=params) == penalty +@pytest.fixture +def amplicon_tms() -> MinOptMax[float]: + return MinOptMax(75.0, 80.0, 90.0) -def build_primer_pair(amplicon_length: int, tm: float) -> PrimerPair: - left_primer = Oligo( - tm=0, - penalty=0, - span=Span(refname="1", start=1, end=max(1, amplicon_length // 4)), - ) - right_primer = Oligo( - tm=0, - penalty=0, - span=Span( - refname="1", start=max(1, amplicon_length - (amplicon_length // 4)), end=amplicon_length - ), - ) - return PrimerPair( - left_primer=left_primer, - right_primer=right_primer, - amplicon_tm=tm, - penalty=0, +@pytest.fixture +def weights() -> PrimerAndAmpliconWeights: + return PrimerAndAmpliconWeights( + product_size_lt=0.5, + product_size_gt=1.5, + product_tm_lt=10, + product_tm_gt=15, ) -@pytest.mark.parametrize( - "prev_left, prev_right, next_left, next_right, min_difference, expected", - [ - # no overlap -> True - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 120, 140), - Span("chr1", 220, 240), - 10, - True, - ), - # left at min_difference -> True - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 110, 130), - Span("chr1", 220, 240), - 10, - True, - ), - # left one less than min_difference -> False - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 109, 129), - Span("chr1", 220, 240), - 10, - False, - ), - # right at min_difference -> True - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 120, 140), - Span("chr1", 210, 230), - 10, - True, - ), - # right one less than min_difference -> False - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 120, 140), - Span("chr1", 209, 229), - 10, - False, - ), - # both at min_difference -> True - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 110, 130), - Span("chr1", 210, 230), - 10, - True, - ), - # both one less than min_difference -> False - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 109, 129), - Span("chr1", 209, 229), - 10, - False, - ), - ], -) -def test_check_primer_overlap( - prev_left: Span, - prev_right: Span, - next_left: Span, - next_right: Span, - min_difference: int, - expected: bool, -) -> None: - assert ( - check_primer_overlap( - prev_left=prev_left, - prev_right=prev_right, - next_left=next_left, - next_right=next_right, - min_difference=min_difference, - ) - == expected - ) - assert ( - check_primer_overlap( - prev_left=next_left, - prev_right=next_right, - next_left=prev_left, - next_right=prev_right, - min_difference=min_difference, - ) - == expected - ) +@pytest.fixture +def all_zero_weights() -> PrimerAndAmpliconWeights: + d = dataclasses.asdict(PrimerAndAmpliconWeights()) + for key in d: + d[key] = 0.0 + return PrimerAndAmpliconWeights(**d) + + +# 1000 bases with 100 bases per line +REF_BASES = """ +CCTGCTGGTTTTATTTCTGCTACTTCTGAAGTGTGGGGCACACAACCTGAGCAGTGCTTTTATTTGAGTCCCAATGCTTTTATTTGAGTTTTGCAAGGTT +ATTCCAAGTTTTACAAATAGAAGGTAGCGTATGACTCAGTCCTTGATATGCCAACCACTGCACAGAGACTTGCCACCTTCCTGTCACTGGAGAAACACTC +ATGTGGGTTTTCTTAAATTTGCCTCCCTCTGAGCTTCCCTTTAACTTCAACTATAATATGCAAGAAAGACTATCTGACCATAAATACACATTTGGGCCAA +TCAAGATGGTTTTGCCAAGGAAAGATGCCCACAATGGTTAAGCAGAATGCAATAATGTAGAGAATATCATTTCTTTCATGCTGGTGTATATCATATGCAT +TCAAAAACAGGGAGAACTTCTAAGCAACTAACAGTGACCATATCAAGCAGGTGCAATCACAGAATAACTGGTTTTCTCCTTTAAGAATTTTTCTATCATT +TGGCTTTCCCCACTCACACACACTAAATATTTTAAGTAAAAAGTTACTTCCATTTTGAAAGAGAAAAGAAAGAGACATGCATGAACATTTTTCTCCACCT +TGGTGCAGGGACCAGACAACTGTATCCAGTGTGCCCACTACATTGACGGCCCCCACTGCGTCAAGACCTGCCCGGCAGGAGTCATGGGAGAAAACAACAC +CCTGGTCTGGAAGTACGCAGACGCCGGCCATGTGTGCCACCTGTGCCATCCAAACTGCACCTACGGGTGAGTGGAAAGTGAAGGAGAACAGAACATTTCC +TCTCTTGCAAATTCAGAGATCAAAAATGTCTCCCAAGTTTTCCGGCAACAAATTGCCGAGGTTTGTATTTGAGTCAGTTACTTAAGGTGTTTTGGTCCCC +ACAGCCATGCCAGTAGCAACTTGCTTGTGAGCAGGCCTCAGTGCAGTGGGAATGACTCTGCCATGCACCGTGTCCCCGGCCGGGCCTGTGTTGTGCAATG +""".strip().replace("\n", "") -@pytest.mark.parametrize( - "pair, expected", - [ - (build_primer_pair(amplicon_length=150, tm=55), True), # OK at optimal - (build_primer_pair(amplicon_length=100, tm=55), True), # OK, amplicon at lower boundary - (build_primer_pair(amplicon_length=99, tm=55), False), # NOK, amplicon < lower boundary - (build_primer_pair(amplicon_length=200, tm=55), True), # OK, amplicon at upper boundary - (build_primer_pair(amplicon_length=201, tm=55), False), # NOK, amplicon > upper boundary - (build_primer_pair(amplicon_length=150, tm=50), True), # OK, tm at lower boundary - (build_primer_pair(amplicon_length=150, tm=49), False), # NOK, tm < lower boundary - (build_primer_pair(amplicon_length=150, tm=60), True), # OK, tm at upper boundary - (build_primer_pair(amplicon_length=150, tm=61), False), # NOK, tm > upper boundary - ], -) -def test_is_acceptable_primer_pair(pair: PrimerPair, expected: bool) -> None: - params = FilteringParams( - amplicon_sizes=MinOptMax(100, 150, 200), - amplicon_tms=MinOptMax(50.0, 55.0, 60.0), - product_size_lt=1, - product_size_gt=1, - product_tm_lt=0.0, - product_tm_gt=0.0, - min_primer_to_target_distance=0, - read_length=100000, +@pytest.fixture +def fasta(tmp_path: Path) -> Path: + """Fixture that returns a Fasta""" + builder = FastaBuilder(line_length=100) + path = tmp_path / "ref.fa" + builder.add("chr1").add(REF_BASES) + builder.to_file(path) + return path + + +def p(bases: str, tm: float, pos: int, pen: float = 0, chrom: str = "chr1") -> Oligo: + """Generates a primer for testing.""" + oligo = Oligo( + name="left", + tm=tm, + penalty=pen, + bases=bases, + span=Span(chrom, pos, pos + len(bases) - 1), + tail=None, ) - assert is_acceptable_primer_pair(primer_pair=pair, params=params) == expected - - -@dataclass(init=True, frozen=True) -class ScoreInput: - left: Oligo - right: Oligo - target: Span - amplicon: Span - amplicon_sequence: str - - def __post_init__(self) -> None: - primer_pair = PrimerPair( - left_primer=self.left, - right_primer=self.right, - amplicon_tm=0, - penalty=0, - ) - object.__setattr__(self, "primer_pair", primer_pair) + assert oligo.span.length == len(oligo.bases) + return oligo -def _score_input() -> ScoreInput: - l_mapping = Span(refname="1", start=95, end=125) - r_mapping = Span(refname="1", start=195, end=225) - amplicon = Span(refname="1", start=l_mapping.end, end=r_mapping.start) - target = Span(refname="1", start=l_mapping.end + 10, end=r_mapping.start - 20) - return ScoreInput( - left=Oligo(penalty=0, tm=0, span=l_mapping), - right=Oligo(penalty=0, tm=0, span=r_mapping), - target=target, - amplicon=amplicon, - amplicon_sequence="A" * amplicon.length, +def pp(lp: Oligo, rp: Oligo, bases: Optional[str] = None, tm: Optional[float] = None) -> PrimerPair: + """Generates a primer pair for testing.""" + if lp.span.end > rp.span.start: + raise ValueError("Overlapping primers not supported.") + + if bases is None: + length = rp.span.end - lp.span.start + 1 + needed = length - lp.span.length - rp.span.length + bases = lp.bases + ("A" * needed) + reverse_complement(rp.bases) + + return PrimerPair( + name="pair", + left_primer=lp, + right_primer=rp, + amplicon_tm=tm if tm is not None else calculate_long_seq_tm(bases), + amplicon_sequence=bases, + penalty=0, ) -@pytest.fixture() -def score_input() -> ScoreInput: - return _score_input() - - -# Developer Note: for `score()`, we must have: -# 1. params.amplicon_sizes.opt == 0 -# 2. params.amplicon_tms.opt is exactly the _calculate_long_seq_tm of the amplicon sequence -# 3. filtering_params.min_primer_to_target_distance is zero, -# (target.start >= left.span.end) -# 4. filtering_params.min_primer_to_target_distance is zero, -# (right.span.start >= target.end) -# 5. filtering_params.read_length is large, (target.end >= left.span.start) -# 6. filtering_params.read_length is large, (right.span.end >= target.start) -def _zero_score_filtering_params(score_input: ScoreInput) -> FilteringParams: - amplicon_tm = calculate_long_seq_tm(score_input.amplicon_sequence) - return FilteringParams( - # for size_penalty to be zero - amplicon_sizes=MinOptMax(0, 0, 200), - product_size_gt=0, - product_size_lt=0, - # for tm_penalty to be zero - amplicon_tms=MinOptMax(amplicon_tm, amplicon_tm, amplicon_tm), - product_tm_gt=0, - product_tm_lt=0, - # for left_dist_penalty and left_dist_penalty to be zero - min_primer_to_target_distance=0, - dist_from_primer_end_weight=0, - # for left_seq_penalty and left_seq_penalty to be zero - read_length=100000, - target_not_covered_by_read_length_weight=0, +def _score( + pair: PrimerPair, + weights: PrimerAndAmpliconWeights, + sizes: MinOptMax[int], + tms: MinOptMax[float], +) -> float: + """Wrapper around picking.score that decomposes the PrimerPair for the arguments.""" + return picking.score( + left_primer=pair.left_primer, + right_primer=pair.right_primer, + amplicon=pair.span, + amplicon_tm=pair.amplicon_tm, + amplicon_sizes=sizes, + amplicon_tms=tms, + weights=weights, ) -@pytest.fixture() -def zero_score_filtering_params(score_input: ScoreInput) -> FilteringParams: - return _zero_score_filtering_params(score_input=score_input) +################################################################################ +# Tests for picking.py::score() +################################################################################ -def test_zero_score( - score_input: ScoreInput, - zero_score_filtering_params: FilteringParams, +def test_score_returns_sum_of_primer_penalties_when_all_weights_zero( + all_zero_weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - assert ( - picking_score( - left_primer=score_input.left, - right_primer=score_input.right, - target=score_input.target, - amplicon=score_input.amplicon, - amplicon_seq_or_tm=score_input.amplicon_sequence, - params=zero_score_filtering_params, - ) - == 0.0 - ) + pair = pp(p("ACGACTCATG", 60.1, 100, 1.7), p("GTGCATACTAG", 59.8, 200, 3.1)) + score = _score(pair=pair, weights=all_zero_weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert score == 1.7 + 3.1 -def test_zero_score_with_amplicon_tm( - score_input: ScoreInput, - zero_score_filtering_params: FilteringParams, +def test_score_returns_sum_of_primer_penalties_when_amplicon_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - amplicon_tm: float = calculate_long_seq_tm(score_input.amplicon_sequence) - assert ( - picking_score( - left_primer=score_input.left, - right_primer=score_input.right, - target=score_input.target, - amplicon=score_input.amplicon, - amplicon_seq_or_tm=amplicon_tm, - params=zero_score_filtering_params, - ) - == 0.0 - ) + pair = pp(p("ACGACTCATG", 60.1, 101, 1.7), p("GTGCATACTAG", 59.8, 240, 3.1), tm=80) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + assert pair.amplicon_tm == amplicon_tms.opt + assert score == 1.7 + 3.1 -# Notes on score_input. -# - amplicon length is 71bp (for size_penalty) -# - amplicon Tm is 66.30319122042972 (for tm_penalty) -# - primer to target distance is 10 for left_dist_penalty and 20 right_dist_penalty. This also -# means we need a read length of at least 81bp (=71 + 10) for the left_seq_penalty to be zero -# and at least 91bp (=71 + 20) for the right_seq_penalty to be zero -@pytest.mark.parametrize( - "kwargs, expected_score", - [ - # size_penalty: amplicon size is too large (71bp > 61bp, diff = 10, so score = (71-61)*5) - ({"product_size_gt": 5.0, "amplicon_sizes": MinOptMax(61, 61, 61)}, (71 - 61) * 5), - # size_penalty: amplicon size is too small (71bp < 81bp, diff = 10, so score = (71-61)*4) - ({"product_size_lt": 4.0, "amplicon_sizes": MinOptMax(81, 81, 81)}, (71 - 61) * 4), - # tm_penalty: amplicon Tm is too large - ( - {"product_tm_gt": 5.0, "amplicon_tms": MinOptMax(60, 60, 60)}, - 5.0 * (66.30319122042972 - 60), - ), - # tm_penalty: amplicon Tm is too small - ( - {"product_tm_lt": 4.0, "amplicon_tms": MinOptMax(70, 70, 70)}, - 4.0 * (70 - 66.30319122042972), - ), - # both [left/right]_dist_penalty are zero: diff > min_primer_to_target_distance - ({"min_primer_to_target_distance": 10, "dist_from_primer_end_weight": 5.0}, 0), - # left_dist_penalty: diff < min_primer_to_target_distance - ({"min_primer_to_target_distance": 11, "dist_from_primer_end_weight": 5.0}, (1 * 5.0)), - ({"min_primer_to_target_distance": 20, "dist_from_primer_end_weight": 5.0}, (10 * 5.0)), - # both [left/right]_dist_penalty: diff < min_primer_to_target_distance, 11bp for left, and - # 1bp for right, so 12bp overall - ({"min_primer_to_target_distance": 21, "dist_from_primer_end_weight": 5.0}, (12 * 5.0)), - # both[left/right]_seq_penalty: zero since read length >= 81bp (left) and >= 91bp (right) - ({"read_length": 91, "target_not_covered_by_read_length_weight": 10.0}, 0), - # right_seq_penalty: less than 91bp for the right but at 81bp for the left - ({"read_length": 81, "target_not_covered_by_read_length_weight": 10.0}, (10 * 10)), - # both[left/right]_seq_penalty: zero since read length <>=> 81bp (left) and < 91bp (right) - ({"read_length": 71, "target_not_covered_by_read_length_weight": 10.0}, 10.0 * (10 + 20)), - ], -) -def test_score( - score_input: ScoreInput, - zero_score_filtering_params: FilteringParams, - kwargs: dict[str, Any], - expected_score: float, +def test_score_when_amplicon_longer_than_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - params = replace(zero_score_filtering_params, **kwargs) - assert ( - picking_score( - left_primer=score_input.left, - right_primer=score_input.right, - target=score_input.target, - amplicon=score_input.amplicon, - amplicon_seq_or_tm=score_input.amplicon_sequence, - params=params, - ) - == expected_score - ) + pair = pp(p("ACGACTCATG", 60.1, 101, 1.0), p("GTGCATACTAG", 59.8, 250, 1.0), tm=80) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + 10 + assert pair.amplicon_tm == amplicon_tms.opt + assert score == pytest.approx(1 + 1 + (10 * weights.product_size_gt)) -@pytest.mark.parametrize( - "left_penalty, right_penalty", [(0, 0), (10.0, 0.0), (0, 12.0), (13.0, 14.0)] -) -def test_score_primer_primer_penalties( - score_input: ScoreInput, - zero_score_filtering_params: FilteringParams, - left_penalty: float, - right_penalty: float, +def test_score_when_amplicon_shorter_than_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - left = replace(score_input.left, penalty=left_penalty) - right = replace(score_input.right, penalty=right_penalty) - assert picking_score( - left_primer=left, - right_primer=right, - target=score_input.target, - amplicon=score_input.amplicon, - amplicon_seq_or_tm=score_input.amplicon_sequence, - params=zero_score_filtering_params, - ) == (left_penalty + right_penalty) - - -def test_primer_pairs( - score_input: ScoreInput, zero_score_filtering_params: FilteringParams, genome_ref: Path + pair = pp(p("ACGACTCATG", 60.1, 101, 1.0), p("GTGCATACTAG", 59.8, 220, 1.0), tm=80) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt - 20 + assert pair.amplicon_tm == amplicon_tms.opt + assert score == pytest.approx(1 + 1 + (20 * weights.product_size_lt)) + + +def test_score_when_amplicon_tm_higher_than_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - primer_length: int = 30 - target = Span(refname="chr1", start=100, end=250) - - # tile some left primers - left_primers = [] - right_primers = [] - for offset in range(0, 50, 5): - # left - left_end = target.start - offset - left_start = left_end - primer_length - left = Oligo( - penalty=-offset, tm=0, span=Span(refname=target.refname, start=left_start, end=left_end) - ) - left_primers.append(left) - # right - right_start = target.end + offset - right_end = right_start + primer_length - right = Oligo( - penalty=offset, - tm=0, - span=Span(refname=target.refname, start=right_start, end=right_end), - ) - right_primers.append(right) + pair = pp(p("ACGACTCATG", 60.1, 101, 1.0), p("GTGCATACTAG", 59.8, 240, 1.0), tm=82.15) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + assert pair.amplicon_tm == amplicon_tms.opt + 2.15 + assert score == pytest.approx(1 + 1 + (2.15 * weights.product_tm_gt)) - with pysam.FastaFile(f"{genome_ref}") as fasta: - primer_pairs = build_primer_pairs( - left_primers=left_primers, - right_primers=right_primers, - target=target, - params=zero_score_filtering_params, - fasta=fasta, - ) - assert len(primer_pairs) == len(left_primers) * len(right_primers) - last_penalty = primer_pairs[0].penalty - primer_counter: Counter[Oligo] = Counter() - for pp in primer_pairs: - assert pp.left_primer in left_primers - assert pp.right_primer in right_primers - primer_counter[pp.left_primer] += 1 - primer_counter[pp.right_primer] += 1 - # by design, only the left/right penalties contribute to the primer pair penalty - assert pp.penalty == pp.left_primer.penalty + pp.right_primer.penalty - # at least check that the amplicon Tm is non-zero - assert pp.amplicon_tm > 0 - # at least check that the amplicon sequence retrieved is the correct length - assert len(pp.amplicon_sequence) == pp.amplicon.length - # check that the primer pairs are sorted by increasing penalty - assert last_penalty <= pp.penalty - last_penalty = pp.penalty - # make sure we see all the primers the same # of times! - items = primer_counter.items() - assert len(set(i[0] for i in items)) == len(left_primers) + len( - right_primers - ) # same primers - assert len(set(i[1] for i in items)) == 1 # same counts for each primer - - -def test_primer_pairs_except_different_references( - score_input: ScoreInput, zero_score_filtering_params: FilteringParams, genome_ref: Path + +def test_score_when_amplicon_tm_lower_than_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - # all primers (both left and right) _should_ be on the same reference (refname). Add a test - # that raises an exception (in PrimerPair) that doesn't. - with pysam.FastaFile(f"{genome_ref}") as fasta: - with pytest.raises(ValueError, match="Cannot create a primer pair"): - # change the reference for the right primer - right = replace(score_input.right, span=Span(refname="Y", start=195, end=225)) - build_primer_pairs( - left_primers=[score_input.left], - right_primers=[right], - target=score_input.target, - params=zero_score_filtering_params, - fasta=fasta, - ) + pair = pp(p("ACGACTCATG", 60.1, 101, 1.0), p("GTGCATACTAG", 59.8, 240, 1.0), tm=75.67) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + assert pair.amplicon_tm == amplicon_tms.opt - 4.33 + assert score == pytest.approx(1 + 1 + (4.33 * weights.product_tm_lt)) -def _picking_ref() -> Path: - return Path(__file__).parent / "data" / "picking.fa" - - -@pytest.fixture(scope="session") -def picking_ref() -> Path: - return _picking_ref() - - -def _target() -> Span: - target: Span = Span(refname="chr1", start=150, end=250) - return target - - -def _primer_pair( - target_offset: int = 0, - primer_length: int = 30, - amplicon_tm: Optional[float] = None, - penalty: int = 0, - target: Optional[Span] = None, - params: Optional[FilteringParams] = None, -) -> PrimerPair: - if target is None: - target = _target() - if params is None: - params = _zero_score_filtering_params(score_input=_score_input()) - fasta = pysam.FastaFile(f"{_picking_ref()}") - if amplicon_tm is None: - amplicon_tm = params.amplicon_tms.opt - left_span = Span( - refname=target.refname, - start=target.start - primer_length - target_offset + 1, # +1 for 1-based inclusive - end=target.start - target_offset, - strand=Strand.POSITIVE, - ) - assert left_span.length == primer_length - left_bases = fasta.fetch( - reference=left_span.refname, start=left_span.start - 1, end=left_span.end - ) - right_span = Span( - refname=target.refname, - start=target.end + target_offset, - end=target.end + primer_length + target_offset - 1, # -1 for 1-based inclusive - strand=Strand.NEGATIVE, - ) - assert right_span.length == primer_length - right_bases = fasta.fetch( - reference=left_span.refname, start=right_span.start - 1, end=right_span.end - ) - right_bases = reverse_complement(right_bases) - fasta.close() - return PrimerPair( - left_primer=Oligo(bases=left_bases, penalty=0, tm=0, span=left_span), - right_primer=Oligo(bases=right_bases, penalty=0, tm=0, span=right_span), - amplicon_tm=amplicon_tm, - penalty=penalty, +def test_score_realistic( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], +) -> None: + pair = pp(p("ACGACTCATG", 60.1, 101, 3.1), p("GTGCATACTAG", 59.8, 256, 4.08), tm=75.67) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + 16 + assert pair.amplicon_tm == amplicon_tms.opt - 4.33 + assert score == pytest.approx( + 3.1 + 4.08 + (16 * weights.product_size_gt) + (4.33 * weights.product_tm_lt) ) -def _pick_top_primer_pairs( - params: FilteringParams, - picking_ref: Path, - primer_pairs: list[PrimerPair], - max_primer_hits: int, - max_primer_pair_hits: int, - min_difference: int = 1, -) -> list[PrimerPair]: - with ( - NtThermoAlign() as dimer_checker, - OffTargetDetector( - ref=picking_ref, - max_primer_hits=max_primer_hits, - max_primer_pair_hits=max_primer_pair_hits, - three_prime_region_length=5, - max_mismatches_in_three_prime_region=0, - max_mismatches=0, - max_amplicon_size=params.amplicon_sizes.max, - executable=BWA_EXECUTABLE_NAME, - ) as offtarget_detector, - ): - picked: list[PrimerPair] = pick_top_primer_pairs( - primer_pairs=primer_pairs, - num_primers=len(primer_pairs), - min_difference=min_difference, - params=params, - offtarget_detector=offtarget_detector, - is_dimer_tm_ok=lambda s1, s2: ( - dimer_checker.duplex_tm(s1=s1, s2=s2) <= params.max_dimer_tm - ), - ) - return picked - - -_PARAMS: FilteringParams = _zero_score_filtering_params(_score_input()) -"""Filter parameters for use in creating the test cases for `test_pick_top_primer_pairs`""" - - -@pytest.mark.parametrize( - "picked, primer_pair", - [ - # primer pair passes all primer/primer-pair-specific filters - (True, _primer_pair()), - # too small amplicon size - (False, _primer_pair(amplicon_tm=_PARAMS.amplicon_sizes.min - 1)), - # too big amplicon size - (False, _primer_pair(amplicon_tm=_PARAMS.amplicon_sizes.max + 1)), - # too low amplicon Tm - (False, _primer_pair(amplicon_tm=_PARAMS.amplicon_tms.min - 1)), - # too large amplicon Tm - (False, _primer_pair(amplicon_tm=_PARAMS.amplicon_tms.max + 1)), - ], -) -def test_pick_top_primer_pairs_individual_primers( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, - picked: bool, - primer_pair: PrimerPair, +################################################################################ +# Tests for picking.py::build_primer_pairs() +################################################################################ + + +def test_build_primer_pairs_no_primers( + fasta: Path, + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - expected = [primer_pair] if picked else [] - assert expected == _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=[primer_pair], - max_primer_hits=2, - max_primer_pair_hits=2, + pairs = list( + picking.build_primer_pairs( + left_primers=[], + right_primers=[], + target=Span("chr1", 240, 260), + amplicon_sizes=amplicon_sizes, + amplicon_tms=amplicon_tms, + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + ) ) + assert len(pairs) == 0 -def test_pick_top_primer_pairs_dimer_tm_too_large( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, +def test_build_primer_pairs_single_pair( + fasta: Path, + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - pp = _primer_pair() - - # get the Tm for the primer pair - duplex_tm = NtThermoAlign().duplex_tm(s1=pp.left_primer.bases, s2=pp.right_primer.bases) - - # at the maximum dimer Tm - params = replace(zero_score_filtering_params, max_dimer_tm=duplex_tm) - assert [pp] == _pick_top_primer_pairs( - params=params, - picking_ref=picking_ref, - primer_pairs=[pp], - max_primer_hits=2, - max_primer_pair_hits=2, + pairs = list( + picking.build_primer_pairs( + left_primers=[p(REF_BASES[200:220], tm=60.1, pos=201, pen=1.0)], + right_primers=[p(reverse_complement(REF_BASES[280:300]), tm=61.8, pos=281, pen=1.1)], + target=Span("chr1", 240, 260), + amplicon_sizes=amplicon_sizes, + amplicon_tms=amplicon_tms, + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + ) ) - # **just** over the maximum dimer Tm - params = replace(zero_score_filtering_params, max_dimer_tm=duplex_tm - 0.0001) - assert [] == _pick_top_primer_pairs( - params=params, - picking_ref=picking_ref, - primer_pairs=[pp], - max_primer_hits=2, - max_primer_pair_hits=2, - ) + for pair in pairs: + assert pair.span.length == len(pair.bases) + assert pair.amplicon.start == pair.left_primer.span.start + assert pair.amplicon.end == pair.right_primer.span.end + assert pair.bases == REF_BASES[pair.amplicon.start - 1 : pair.amplicon.end] + assert len(pairs) == 1 + assert pairs[0].span == Span("chr1", 201, 300) + assert pairs[0].left_primer.bases == pairs[0].bases[0:20] + assert pairs[0].right_primer.bases == reverse_complement(pairs[0].bases[-20:]) -_PRIMER_PAIRS_PICKING: list[PrimerPair] = [ - _primer_pair(target_offset=10, penalty=2), # left.start=111 - _primer_pair(target_offset=6, penalty=2), # 4bp from the previous, left.start=115 - _primer_pair(target_offset=3, penalty=3), # 3bp from the previous, left.start=118 - _primer_pair(target_offset=1, penalty=4), # 2bp from the previous, left.start=120 - _primer_pair(target_offset=0, penalty=5), # 1bp from the previous, left.start=121 - _primer_pair(target_offset=0, penalty=6), # same as the previous, left.start=121 -] - - -@dataclass(frozen=True) -class _PickingTestCase: - min_difference: int - primer_pairs: list[Tuple[bool, PrimerPair]] - - -@pytest.mark.parametrize( - "test_case", - [ - # primer pairs that overlap each other fully, so second one is discarded - _PickingTestCase( - min_difference=1, - primer_pairs=[ - (True, _primer_pair(target_offset=1, penalty=2)), # first primer pair, kept - ( - False, - _primer_pair(target_offset=1, penalty=2), - ), # same as the previous, discarded - ], - ), - # primer pairs that are offset by 1bp (equal to min-difference) so both are kept - _PickingTestCase( - min_difference=1, - primer_pairs=[ - (True, _primer_pair(target_offset=0, penalty=2)), - (True, _primer_pair(target_offset=1, penalty=2)), - ], - ), - # primer pairs that are offset by 1bp (less than min-difference) so the first is kept - _PickingTestCase( - min_difference=2, - primer_pairs=[ - (True, _primer_pair(target_offset=0, penalty=2)), - (False, _primer_pair(target_offset=1, penalty=2)), - ], - ), - _PickingTestCase( - min_difference=3, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, True, True, False, True, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=4, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, True, False, True, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=5, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, True, False, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=6, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, True, False, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=7, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, True, False, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=8, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, False, True, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=9, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, False, True, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=10, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, False, False, True, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=11, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, False, False, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - ], -) -def test_pick_top_primer_pairs_multiple_primers( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, - test_case: _PickingTestCase, + +def test_build_primers_amplicon_size_filtering( + fasta: Path, + weights: PrimerAndAmpliconWeights, ) -> None: - in_primer_pairs = [pp[1] for pp in test_case.primer_pairs] - - # First check that picked _alone_ they will be kept - for primer_pair in in_primer_pairs: - assert [primer_pair] == _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=[primer_pair], - max_primer_hits=2, - max_primer_pair_hits=2, - min_difference=test_case.min_difference, + pairs = list( + picking.build_primer_pairs( + left_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(1, 151, 10)], + right_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(151, 301, 10)], + target=Span("chr1", 150, 160), + amplicon_sizes=MinOptMax(100, 150, 200), + amplicon_tms=MinOptMax(0.0, 0.0, 100.0), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, ) - - # Next, check that picked _together_ only those primer pairs with "True" associated with them - # are kept - expected = [pp[1] for pp in test_case.primer_pairs if pp[0]] - assert expected == _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=in_primer_pairs, - max_primer_hits=2, - max_primer_pair_hits=2, - min_difference=test_case.min_difference, ) + assert len(pairs) > 0 + + for pair in pairs: + assert pair.span.length == len(pair.bases) + assert pair.amplicon.start == pair.left_primer.span.start + assert pair.amplicon.end == pair.right_primer.span.end + assert pair.bases == REF_BASES[pair.amplicon.start - 1 : pair.amplicon.end] + assert pair.span.length <= 200 + -def test_pick_top_primer_pairs_input_order( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, +def test_build_primers_heterodimer_filtering( + fasta: Path, + weights: PrimerAndAmpliconWeights, ) -> None: - with pytest.raises(ValueError, match="Primers must be given in order by penalty"): - _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=[_primer_pair(penalty=6), _primer_pair(penalty=5)], - max_primer_hits=2, - max_primer_pair_hits=2, - min_difference=0, + pairs = list( + picking.build_primer_pairs( + left_primers=[ + p("CGCGCGCGCGCGCGCGCGCG", tm=60, pos=100, pen=1.0), + p("CTGCTGCTGCTGCTGCTGCT", tm=60, pos=100, pen=1.0), + ], + right_primers=[ + p("GCGCGCGCGCGCGCGCGCGC", tm=60, pos=180, pen=1.0), + p("AGCAGCAGCAGCAGCAGCAG", tm=60, pos=180, pen=1.0), + ], + target=Span("chr1", 150, 160), + amplicon_sizes=MinOptMax(10, 150, 200), + amplicon_tms=MinOptMax(0.0, 0.0, 100.0), + max_heterodimer_tm=50, + weights=weights, + fasta_path=fasta, ) + ) + + for pair in pairs: + assert pair.span.length == len(pair.bases) + assert pair.amplicon.start == pair.left_primer.span.start + assert pair.amplicon.end == pair.right_primer.span.end + assert pair.bases == REF_BASES[pair.amplicon.start - 1 : pair.amplicon.end] + assert pair.span.length <= 200 + assert len(pairs) == 2 + primer_seqs = sorted([f"{pp.left_primer.bases}-{pp.right_primer.bases}" for pp in pairs]) + assert primer_seqs == [ + "CGCGCGCGCGCGCGCGCGCG-AGCAGCAGCAGCAGCAGCAG", + "CTGCTGCTGCTGCTGCTGCT-GCGCGCGCGCGCGCGCGCGC", + ] -def test_pick_top_primer_pairs_too_many_off_targets( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, + +def test_build_primer_pairs_amplicon_tm_filtering( + fasta: Path, + weights: PrimerAndAmpliconWeights, ) -> None: - """The contig is duplicated wholesale in `picking.fa`, so we should get two hits per primer. - Thus, setting `max_primer_hits=1` will return "too many hits""" - assert [] == _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=[_primer_pair()], - max_primer_hits=1, - max_primer_pair_hits=1, - ) + amp_bases = REF_BASES[200:300] + amp_tm = calculate_long_seq_tm(amp_bases) + assert floor(amp_tm) == 84 + + for max_tm in [83, 84, 85]: + pairs = list( + picking.build_primer_pairs( + left_primers=[p(REF_BASES[200:220], tm=60.1, pos=201, pen=1.0)], + right_primers=[ + p(reverse_complement(REF_BASES[280:300]), tm=61.8, pos=281, pen=1.1) + ], + target=Span("chr1", 240, 260), + amplicon_sizes=MinOptMax(0, 0, 500), + amplicon_tms=MinOptMax(0, 80, max_tm), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + ) + ) + + assert len(pairs) == (1 if max_tm > amp_tm else 0) -def test_and_pick_primer_pairs( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, +def test_build_primer_pairs_fails_when_primers_on_wrong_reference( + fasta: Path, + weights: PrimerAndAmpliconWeights, ) -> None: - target = _target() - pp = _primer_pair(target=target, amplicon_tm=92.96746617835336) - params = replace( - zero_score_filtering_params, - amplicon_tms=MinOptMax(pp.amplicon_tm, pp.amplicon_tm, pp.amplicon_tm), + target = Span("chr1", 240, 260) + valid_lefts = [p(REF_BASES[200:220], tm=60, pos=201, pen=1)] + invalid_lefts = [p(REF_BASES[200:220], tm=60, chrom="X", pos=201, pen=1)] + valid_rights = [p(reverse_complement(REF_BASES[280:300]), tm=61, pos=281, pen=1)] + invalid_rights = [p(reverse_complement(REF_BASES[280:300]), tm=61, chrom="X", pos=281, pen=1)] + + picks = picking.build_primer_pairs( + left_primers=valid_lefts, + right_primers=valid_rights, + target=target, + amplicon_sizes=MinOptMax(0, 100, 500), + amplicon_tms=MinOptMax(0, 80, 150), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, ) - offtarget_detector = OffTargetDetector( - ref=picking_ref, - max_primer_hits=2, - max_primer_pair_hits=2, - three_prime_region_length=5, - max_mismatches_in_three_prime_region=0, - max_mismatches=0, - max_amplicon_size=params.amplicon_sizes.max, - executable=BWA_EXECUTABLE_NAME, - ) + assert next(picks) is not None - with pysam.FastaFile(f"{picking_ref}") as fasta: - designed_primer_pairs = build_and_pick_primer_pairs( - left_primers=[pp.left_primer], - right_primers=[pp.right_primer], + with pytest.raises(ValueError, match="Left primers exist on different reference"): + _picks = list(picking.build_primer_pairs( + left_primers=invalid_lefts, + right_primers=valid_rights, target=target, - num_primers=1, - min_difference=1, - params=params, - offtarget_detector=offtarget_detector, - dimer_checker=NtThermoAlign(), - fasta=fasta, - ) - assert len(designed_primer_pairs) == 1 - designed_primer_pair = designed_primer_pairs[0] - assert designed_primer_pair.left_primer == pp.left_primer - assert designed_primer_pair.right_primer == pp.right_primer - assert designed_primer_pair.amplicon == pp.amplicon - assert designed_primer_pair.penalty == pp.penalty - assert designed_primer_pair.amplicon_tm == pp.amplicon_tm - assert len(designed_primer_pair.amplicon_sequence) == pp.amplicon.length + amplicon_sizes=MinOptMax(0, 100, 500), + amplicon_tms=MinOptMax(0, 80, 150), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + )) + + with pytest.raises(ValueError, match="Right primers exist on different reference"): + _picks = list(picking.build_primer_pairs( + left_primers=valid_lefts, + right_primers=invalid_rights, + target=target, + amplicon_sizes=MinOptMax(0, 100, 500), + amplicon_tms=MinOptMax(0, 80, 150), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + ))