Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress and Cancelation for Indexing #1379

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cram/cram_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,10 @@ int cram_index_container(cram_fd *fd,
* Returns 0 on success,
* negative on failure (-1 for read failure, -4 for write failure)
*/
int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx) {
int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx, const hts_progress_callback progress_fn, void *progress_data) {
cram_container *c;
off_t cpos, hpos;
off_t ppos = 0;
BGZF *fp;
kstring_t fn_idx_str = {0};
int64_t last_ref = -9, last_start = -9;
Expand Down Expand Up @@ -771,6 +772,13 @@ int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx) {

hpos = htell(fd->fp);

if (progress_fn && hpos != ppos) {
if (progress_fn(hpos, progress_data) != 0) {
return -5;
}
ppos = hpos;
}

if (!(c->comp_hdr_block = cram_read_block(fd)))
return -1;
assert(c->comp_hdr_block->content_type == COMPRESSION_HEADER);
Expand Down
4 changes: 3 additions & 1 deletion cram/cram_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,14 @@ int cram_seek_to_refpos(cram_fd *fd, cram_range *r);
* fd is a newly opened cram file that we wish to index.
* fn_base is the filename of the associated CRAM file.
* fn_idx is the filename of the index file to be written;
* progress_fn is an optional callback which will be called periodically with the current working offset into the file
* progress_data is the optional data passed to the progress_fn
* if NULL, we add ".crai" to fn_base to get the index filename.
*
* Returns 0 on success,
* negative on failure (-1 for read failure, -4 for write failure)
*/
int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx);
int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx, const hts_progress_callback progress_fn, void *progress_data);

/*
* Adds a single slice to the index.
Expand Down
3 changes: 3 additions & 0 deletions htslib/hts.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ struct hFILE;
struct hts_tpool;
struct sam_hdr_t;


typedef int (*hts_progress_callback)(int64_t current_pos, void *data);

/**
* @hideinitializer
* Deprecated macro to expand a dynamic array of a given type
Expand Down
14 changes: 14 additions & 0 deletions htslib/sam.h
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,20 @@ int sam_index_build2(const char *fn, const char *fnidx, int min_shift) HTS_RESUL
HTSLIB_EXPORT
int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthreads) HTS_RESULT_USED;

/// Generate and save an index to a specific file
/** @param fn Input BAM/CRAM/etc filename
@param fnidx Output filename, or NULL to add .bai/.csi/etc to @a fn
@param min_shift Positive to generate CSI, or 0 to generate BAI
@param nthreads Number of threads to use when building the index
@param progress_fn Funciton to call with progress updates
@param progress_data Data to pass to the progress function
@return 0 if successful, or negative if an error occurred (see
sam_index_build for error codes)
*/
HTSLIB_EXPORT
int sam_index_build4(const char *fn, const char *fnidx, int min_shift, int nthreads, const hts_progress_callback progress_fn, void* progress_data) HTS_RESULT_USED;


/// Free a SAM iterator
/// @param iter Iterator to free
#define sam_itr_destroy(iter) hts_itr_destroy(iter)
Expand Down
9 changes: 9 additions & 0 deletions htslib/tbx.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ typedef struct tbx_t {
void *dict;
} tbx_t;


extern const tbx_conf_t tbx_conf_gff, tbx_conf_bed, tbx_conf_psltbl, tbx_conf_sam, tbx_conf_vcf;

#define tbx_itr_destroy(iter) hts_itr_destroy(iter)
Expand All @@ -76,6 +77,10 @@ extern const tbx_conf_t tbx_conf_gff, tbx_conf_bed, tbx_conf_psltbl, tbx_conf_sa
*/
HTSLIB_EXPORT
tbx_t *tbx_index(BGZF *fp, int min_shift, const tbx_conf_t *conf);

HTSLIB_EXPORT
tbx_t *tbx_index2(BGZF *fp, int min_shift, const tbx_conf_t *conf, const hts_progress_callback progress_fn, void *progress_data);

/*
* All tbx_index_build* methods return: 0 (success), -1 (general failure) or -2 (compression not BGZF)
*/
Expand All @@ -88,6 +93,10 @@ extern const tbx_conf_t tbx_conf_gff, tbx_conf_bed, tbx_conf_psltbl, tbx_conf_sa
HTSLIB_EXPORT
int tbx_index_build3(const char *fn, const char *fnidx, int min_shift, int n_threads, const tbx_conf_t *conf);

HTSLIB_EXPORT
int tbx_index_build4(const char *fn, const char *fnidx, int min_shift, int n_threads, const tbx_conf_t *conf,
const hts_progress_callback progress_fn, void *progress_data);


/// Load or stream a .tbi or .csi index
/** @param fn Name of the data file corresponding to the index
Expand Down
24 changes: 18 additions & 6 deletions sam.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,10 @@ int bam_set_qname(bam1_t *rec, const char *qname)
*** BAM indexing ***
********************/

static hts_idx_t *sam_index(htsFile *fp, int min_shift)
static hts_idx_t *sam_index(htsFile *fp, int min_shift, const hts_progress_callback progress_fn, void* progress_data)
{
int n_lvls, i, fmt, ret;
int64_t last_block_address = 0;
bam1_t *b;
hts_idx_t *idx;
sam_hdr_t *h;
Expand All @@ -944,6 +945,12 @@ static hts_idx_t *sam_index(htsFile *fp, int min_shift)
idx = hts_idx_init(h->n_targets, fmt, bgzf_tell(fp->fp.bgzf), min_shift, n_lvls);
b = bam_init1();
while ((ret = sam_read1(fp, h, b)) >= 0) {
if (progress_fn && fp->fp.bgzf->block_address != last_block_address) {
if (progress_fn(fp->fp.bgzf->block_address, progress_data) != 0) {
goto err;
}
last_block_address = fp->fp.bgzf->block_address;
}
ret = hts_idx_push(idx, b->core.tid, b->core.pos, bam_endpos(b), bgzf_tell(fp->fp.bgzf), !(b->core.flag&BAM_FUNMAP));
if (ret < 0) { // unsorted or doesn't fit
hts_log_error("Read '%s' with ref_name='%s', ref_length=%"PRIhts_pos", flags=%d, pos=%"PRIhts_pos" cannot be indexed", bam_get_qname(b), sam_hdr_tid2name(h, b->core.tid), sam_hdr_tid2len(h, b->core.tid), b->core.flag, b->core.pos+1);
Expand All @@ -963,7 +970,7 @@ static hts_idx_t *sam_index(htsFile *fp, int min_shift)
return NULL;
}

int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthreads)
int sam_index_build4(const char *fn, const char *fnidx, int min_shift, int nthreads, const hts_progress_callback progress_fn, void* progress_data)
{
hts_idx_t *idx;
htsFile *fp;
Expand All @@ -976,7 +983,7 @@ int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthre
switch (fp->format.format) {
case cram:

ret = cram_index_build(fp->fp.cram, fn, fnidx);
ret = cram_index_build(fp->fp.cram, fn, fnidx, progress_fn, progress_data);
break;

case bam:
Expand All @@ -987,7 +994,7 @@ int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthre
ret = -1;
break;
}
idx = sam_index(fp, min_shift);
idx = sam_index(fp, min_shift, progress_fn, progress_data);
if (idx) {
ret = hts_idx_save_as(idx, fn, fnidx, (min_shift > 0)? HTS_FMT_CSI : HTS_FMT_BAI);
if (ret < 0) ret = -4;
Expand All @@ -1005,14 +1012,19 @@ int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthre
return ret;
}

int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthreads)
{
return sam_index_build4(fn, fnidx, min_shift, nthreads, NULL, NULL);
}

int sam_index_build2(const char *fn, const char *fnidx, int min_shift)
{
return sam_index_build3(fn, fnidx, min_shift, 0);
return sam_index_build4(fn, fnidx, min_shift, 0, NULL, NULL);
}

int sam_index_build(const char *fn, int min_shift)
{
return sam_index_build3(fn, NULL, min_shift, 0);
return sam_index_build4(fn, NULL, min_shift, 0, NULL, NULL);
}

// Provide bam_index_build() symbol for binary compatibility with earlier HTSlib
Expand Down
29 changes: 26 additions & 3 deletions tbx.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ static int adjust_n_lvls(int min_shift, int n_lvls, int64_t max_len)
}

tbx_t *tbx_index(BGZF *fp, int min_shift, const tbx_conf_t *conf)
{
return tbx_index2(fp, min_shift, conf, NULL, NULL);
}

tbx_t *tbx_index2(BGZF *fp, int min_shift, const tbx_conf_t *conf,
const hts_progress_callback progress_fn, void *progress_data)
{
tbx_t *tbx;
kstring_t str;
Expand All @@ -294,6 +300,7 @@ tbx_t *tbx_index(BGZF *fp, int min_shift, const tbx_conf_t *conf)
uint64_t last_off = 0;
tbx_intv_t intv;
int64_t max_ref_len = 0;
int64_t last_block_address = 0;

str.s = 0; str.l = str.m = 0;
tbx = (tbx_t*)calloc(1, sizeof(tbx_t));
Expand All @@ -303,6 +310,14 @@ tbx_t *tbx_index(BGZF *fp, int min_shift, const tbx_conf_t *conf)
else min_shift = 14, n_lvls = 5, fmt = HTS_FMT_TBI;
while ((ret = bgzf_getline(fp, '\n', &str)) >= 0) {
++lineno;

if (last_block_address != fp->block_address) {
if (progress_fn && progress_fn(fp->block_address, progress_data) != 0) {
goto fail;
}
last_block_address = fp->block_address;
}

if (str.s[0] == tbx->conf.meta_char && fmt == HTS_FMT_CSI) {
switch (tbx->conf.preset) {
case TBX_SAM:
Expand Down Expand Up @@ -363,29 +378,37 @@ void tbx_destroy(tbx_t *tbx)
}

int tbx_index_build3(const char *fn, const char *fnidx, int min_shift, int n_threads, const tbx_conf_t *conf)
{
return tbx_index_build4(fn, fnidx, min_shift, n_threads, conf, NULL, NULL);
}

int tbx_index_build4(const char *fn, const char *fnidx, int min_shift, int n_threads,
const tbx_conf_t *conf, const hts_progress_callback progress_fn,
void *progress_data)
{
tbx_t *tbx;
BGZF *fp;
int ret;
if ((fp = bgzf_open(fn, "r")) == 0) return -1;
if ( n_threads ) bgzf_mt(fp, n_threads, 256);
if ( bgzf_compression(fp) != bgzf ) { bgzf_close(fp); return -2; }
tbx = tbx_index(fp, min_shift, conf);
tbx = tbx_index2(fp, min_shift, conf, progress_fn, progress_data);
bgzf_close(fp);
if ( !tbx ) return -1;
ret = hts_idx_save_as(tbx->idx, fn, fnidx, min_shift > 0? HTS_FMT_CSI : HTS_FMT_TBI);
tbx_destroy(tbx);
return ret;
}


int tbx_index_build2(const char *fn, const char *fnidx, int min_shift, const tbx_conf_t *conf)
{
return tbx_index_build3(fn, fnidx, min_shift, 0, conf);
return tbx_index_build4(fn, fnidx, min_shift, 0, conf, NULL, NULL);
}

int tbx_index_build(const char *fn, int min_shift, const tbx_conf_t *conf)
{
return tbx_index_build3(fn, NULL, min_shift, 0, conf);
return tbx_index_build4(fn, NULL, min_shift, 0, conf, NULL, NULL);
}

static tbx_t *index_load(const char *fn, const char *fnidx, int flags)
Expand Down