Skip to content

Commit

Permalink
Merge pull request #8 from COMBINE-lab/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
rob-p committed Mar 2, 2024
2 parents aae0201 + da284f3 commit 7118cbf
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 36 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
# we specify bash to get pipefail; it guards against the `curl` command
# failing. otherwise `sh` won't catch that `curl` returned non-0
shell: bash
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.10.0/cargo-dist-installer.sh | sh"
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.11.1/cargo-dist-installer.sh | sh"
# sure would be cool if github gave us proper conditionals...
# so here's a doubly-nested ternary-via-truthiness to try to provide the best possible
# functionality based on whether this is a pull_request, and whether it's from a fork.
Expand Down Expand Up @@ -161,7 +161,8 @@ jobs:
with:
submodules: recursive
- name: Install cargo-dist
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.10.0/cargo-dist-installer.sh | sh"
shell: bash
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.11.1/cargo-dist-installer.sh | sh"
# Get all the local artifacts for the global tasks to use (for e.g. checksums)
- name: Fetch local artifacts
uses: actions/download-artifact@v4
Expand Down Expand Up @@ -206,7 +207,7 @@ jobs:
with:
submodules: recursive
- name: Install cargo-dist
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.10.0/cargo-dist-installer.sh | sh"
run: "curl --proto '=https' --tlsv1.2 -LsSf https://github.com/axodotdev/cargo-dist/releases/download/v0.11.1/cargo-dist-installer.sh | sh"
# Fetch artifacts from scratch-storage
- name: Fetch artifacts
uses: actions/download-artifact@v4
Expand Down
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "oarfish"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
authors = [
"Zahra Zare Jousheghani <[email protected]>",
Expand Down Expand Up @@ -30,7 +30,7 @@ categories = ["command-line-utilities", "science"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.79"
anyhow = "1.0.80"
bio-types = "1.0.1"
clap = { version = "4.5.1", features = ["derive"] }
coitrees = "0.4.0"
Expand All @@ -42,15 +42,17 @@ tabled = "0.15.0"
tracing = "0.1.40"
tracing-subscriber = {version="0.3.18", features=["env-filter"]}
typed-builder = "0.18.1"
rayon = "1.8"
rayon = "1.9"
statrs = "0.16"
csv = "1.3"
ndarray = "0.15"
serde = { version = "1", features = ["derive"] }
itertools = "0.12.1"
serde_json = "1.0.113"
serde_json = "1.0.114"
path-tools = "0.1.0"
atomic_float = "0.1.0"
rand = "0.8.5"
arrow2 = { version = "0.18.0", features = ["io_parquet", "io_parquet_gzip", "io_parquet_zstd", "io_parquet_snappy"] }

[[bin]]
name = "oarfish"
Expand All @@ -69,7 +71,7 @@ lto = "thin"
# Config for 'cargo dist'
[workspace.metadata.dist]
# The preferred cargo-dist version to use in CI (Cargo.toml SemVer syntax)
cargo-dist-version = "0.10.0"
cargo-dist-version = "0.11.1"
# CI backends to support
ci = ["github"]
# The installers to generate for each app
Expand Down
55 changes: 55 additions & 0 deletions src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use rand::distributions::{Distribution, Uniform};
use rand::Rng;

/// Wraps an iterator `I` to produce a new iterator that samples the original
/// with replacement.
pub struct SampleWithReplaceIter<'b, I: ExactSizeIterator> {
pub inds: &'b [usize],
pub iter: I,
pub idx: usize,
pub e_ptr: usize,
pub prev: Option<I::Item>,
}

impl<'b, I: ExactSizeIterator> Iterator for SampleWithReplaceIter<'b, I>
where
I::Item: Copy,
{
type Item = I::Item;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some(next_ind) = self.inds.get(self.idx) {
// always makes sure we have a previous
// element. This will get the first
// element but wont yet increment
// the pointer (e_ptr)
if self.prev.is_none() {
self.prev = self.iter.next();
}
// next element is not a duplicate
if self.e_ptr != *next_ind {
// otherwise, call next until we have the
// correct element
loop {
self.prev = self.iter.next();
self.e_ptr += 1;
if self.e_ptr == *next_ind {
break;
}
}
}
self.idx += 1;
self.prev
} else {
None
}
}
}

pub fn get_sample_inds<R: Rng + ?Sized>(n: usize, rng: &mut R) -> Vec<usize> {
let dist = Uniform::new(0, n);
let mut inds: Vec<usize> = dist.sample_iter(rng).take(n).collect();
inds.sort_unstable();
inds
}
108 changes: 86 additions & 22 deletions src/em.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::sync::atomic::Ordering;

use crate::util::oarfish_types::{AlnInfo, EMInfo, InMemoryAlignmentStore, TranscriptInfo};
use crate::util::oarfish_types::{AlnInfo, EMInfo, TranscriptInfo};
use atomic_float::AtomicF64;
use itertools::izip;
use num_format::{Locale, ToFormattedString};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rand::thread_rng;
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use tracing::{info, span, trace};

use crate::bootstrap;

/// Performs one iteration of the EM algorithm by looping over all
/// alignments and computing their estimated probability of being
/// the true alignment (using the abunance estimates from `prev_counts`).
Expand All @@ -15,7 +18,7 @@ use tracing::{info, span, trace};
#[inline]
fn m_step_par<'a>(
eq_iterates: &[(&'a [AlnInfo], &'a [f32], &'a [f64])],
_tinfo: &mut [TranscriptInfo],
_tinfo: &[TranscriptInfo],
model_coverage: bool,
prev_count: &mut [AtomicF64],
curr_counts: &mut [AtomicF64],
Expand Down Expand Up @@ -63,15 +66,15 @@ fn m_step_par<'a>(
/// likelihood for all reads mapping to each target.
#[inline]
#[allow(dead_code)]
fn m_step(
eq_map: &InMemoryAlignmentStore,
_tinfo: &mut [TranscriptInfo],
fn m_step<'a, I: Iterator<Item = (&'a [AlnInfo], &'a [f32], &'a [f64])>>(
eq_map_iter: I,
_tinfo: &[TranscriptInfo],
model_coverage: bool,
prev_count: &mut [f64],
curr_counts: &mut [f64],
) {
const DENOM_THRESH: f64 = 1e-30_f64;
for (alns, probs, coverage_probs) in eq_map.iter() {
for (alns, probs, coverage_probs) in eq_map_iter {
let mut denom = 0.0_f64;
for (a, p, cp) in izip!(alns, probs, coverage_probs) {
// Compute the probability of assignment of the
Expand Down Expand Up @@ -100,18 +103,14 @@ fn m_step(
}
}

/// Perform the EM algorithm to estimate the abundances of the
/// target sequences. The return value is a `Vec` of f64 values,
/// each of which is the estimated number of fragments arising from
/// each target.
#[allow(dead_code)]
pub fn em(em_info: &mut EMInfo, _nthreads: usize) -> Vec<f64> {
let span = span!(tracing::Level::INFO, "em");
let _guard = span.enter();

pub fn do_em<'a, I: Iterator<Item = (&'a [AlnInfo], &'a [f32], &'a [f64])> + 'a, F: Fn() -> I>(
em_info: &'a EMInfo,
make_iter: F,
do_log: bool,
) -> Vec<f64> {
let eq_map = em_info.eq_map;
let fops = &eq_map.filter_opts;
let tinfo: &mut Vec<TranscriptInfo> = em_info.txp_info;
let tinfo: &Vec<TranscriptInfo> = em_info.txp_info;
let max_iter = em_info.max_iter;
let convergence_thresh = em_info.convergence_thresh;
let total_weight: f64 = eq_map.num_aligned_reads() as f64;
Expand All @@ -137,7 +136,7 @@ pub fn em(em_info: &mut EMInfo, _nthreads: usize) -> Vec<f64> {
while niter < max_iter {
// allocate the fragments and compute the new counts
m_step(
eq_map,
make_iter(),
tinfo,
fops.model_coverage,
&mut prev_counts,
Expand Down Expand Up @@ -171,7 +170,7 @@ pub fn em(em_info: &mut EMInfo, _nthreads: usize) -> Vec<f64> {
// is a multiple of 10, print out the maximum relative
// difference we observed.
niter += 1;
if niter % 10 == 0 {
if do_log && (niter % 10 == 0) {
if niter % 100 == 0 {
info!(
"iteration {}; rel diff {}",
Expand All @@ -198,7 +197,7 @@ pub fn em(em_info: &mut EMInfo, _nthreads: usize) -> Vec<f64> {
// perform one more EM round, since we just zeroed out
// very small abundances
m_step(
eq_map,
make_iter(),
tinfo,
fops.model_coverage,
&mut prev_counts,
Expand All @@ -212,7 +211,72 @@ pub fn em(em_info: &mut EMInfo, _nthreads: usize) -> Vec<f64> {
/// target sequences. The return value is a `Vec` of f64 values,
/// each of which is the estimated number of fragments arising from
/// each target.
pub fn em_par(em_info: &mut EMInfo, nthreads: usize) -> Vec<f64> {
#[allow(dead_code)]
pub fn em(em_info: &EMInfo, _nthreads: usize) -> Vec<f64> {
let span = span!(tracing::Level::INFO, "em");
let _guard = span.enter();

// function that produces an iterator over the
// scored alignments on demand
let make_iter = || em_info.eq_map.iter();

do_em(em_info, make_iter, true)
}

pub fn do_bootstrap(em_info: &EMInfo) -> Vec<f64> {
let mut rng = thread_rng();
let n = em_info.eq_map.len();
let inds = bootstrap::get_sample_inds(n, &mut rng);

// to not sample the indices but instead just
// run with all reads sampled once
/*
let mut inds = Vec::with_capacity(n);
for i in 0..n { inds.push(i); }
*/

// function that produces an iterator over the
// scored alignments on demand
let make_iter = || bootstrap::SampleWithReplaceIter {
inds: &inds,
iter: em_info.eq_map.iter(),
idx: 0,
e_ptr: 0,
prev: None,
};

do_em(em_info, make_iter, false)
}

pub fn bootstrap(em_info: &EMInfo, num_boot: u32, nthreads: usize) -> Vec<Vec<f64>> {
let span = span!(tracing::Level::INFO, "bootstrap");
let _guard = span.enter();

info!("will collection {num_boot} bootstraps");

let pool = rayon::ThreadPoolBuilder::new()
.num_threads(nthreads)
.build()
.unwrap();

pool.install(|| {
(0..num_boot)
.into_par_iter()
.map(|i| {
let span = span!(tracing::Level::INFO, "bootstrap");
let _guard = span.enter();
info!("evaluating bootstrap replicate {}", i);
do_bootstrap(em_info)
})
.collect()
})
}

/// Perform the EM algorithm to estimate the abundances of the
/// target sequences. The return value is a `Vec` of f64 values,
/// each of which is the estimated number of fragments arising from
/// each target.
pub fn em_par(em_info: &EMInfo, nthreads: usize) -> Vec<f64> {
let span = span!(tracing::Level::INFO, "em");
let _guard = span.enter();

Expand All @@ -223,7 +287,7 @@ pub fn em_par(em_info: &mut EMInfo, nthreads: usize) -> Vec<f64> {

let eq_map = em_info.eq_map;
let fops = &eq_map.filter_opts;
let tinfo: &mut Vec<TranscriptInfo> = em_info.txp_info;
let tinfo: &Vec<TranscriptInfo> = em_info.txp_info;
let max_iter = em_info.max_iter;
let convergence_thresh = em_info.convergence_thresh;
let total_weight: f64 = eq_map.num_aligned_reads() as f64;
Expand Down
Loading

0 comments on commit 7118cbf

Please sign in to comment.