Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hts_open_cb() interface which allows callback function used as data source #647

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions hfile.c
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,82 @@ static int fd_close(hFILE *fpv)
return ret;
}


static const struct hFILE_backend fd_backend =
{
fd_read, fd_write, fd_seek, fd_flush, fd_close
};

typedef struct {
hFILE base;
hFILE_callback_ops ops;
} hFILE_cb;

static ssize_t cb_read(hFILE *fpv, void *buffer, size_t nbytes)
{
hFILE_cb* hcb = (hFILE_cb*)fpv;
ssize_t ret;

if(hcb->ops.read == NULL)
{
errno = EBADF;
return -1;
}

do {
ret = hcb->ops.read ? hcb->ops.read(hcb->ops.cb_data, buffer, nbytes) : 0;
} while(ret < 0 && errno == EINTR);

return ret;
}

static ssize_t cb_write(hFILE* fpv, const void* buffer, size_t nbytes)
{
hFILE_cb* hcb = (hFILE_cb*)fpv;
ssize_t ret;

if(hcb->ops.write == NULL)
{
errno = EBADF;
return -1;
}

do {
ret = hcb->ops.write ? hcb->ops.write(hcb->ops.cb_data, buffer, nbytes) : 0;
} while(ret < 0 && errno == EINTR);

return ret;
}

static off_t cb_seek(hFILE *fpv, off_t offset, int whence)
{
hFILE_cb* hcb = (hFILE_cb*)fpv;
if(hcb->ops.seek)
return hcb->ops.seek(hcb->ops.cb_data, offset, whence);

errno = ESPIPE;
return -1;
}

static int cb_flush(hFILE* fpv)
{
hFILE_cb* hcb = (hFILE_cb*)fpv;

return hcb->ops.flush ? hcb->ops.flush(hcb->ops.cb_data) : 0;
}

static int cb_close(hFILE* fpv)
{
hFILE_cb* hcb = (hFILE_cb*)fpv;

return hcb->ops.close ? hcb->ops.close(hcb->ops.cb_data) : 0;
}

static const struct hFILE_backend cb_backend =
{
cb_read, cb_write, cb_seek, cb_flush, cb_close
};

static size_t blksize(int fd)
{
#ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
Expand All @@ -595,6 +666,17 @@ static size_t blksize(int fd)
#endif
}

hFILE *hopen_callback(hFILE_callback_ops ops, const char* mode)
{
hFILE_cb *ret = (hFILE_cb*) hfile_init(sizeof(*ret), mode, 0);
if(ret)
ret->ops = ops;

ret->base.backend = &cb_backend;

return (hFILE*)ret;
}

static hFILE *hopen_fd(const char *filename, const char *mode)
{
hFILE_fd *fp = NULL;
Expand Down
15 changes: 13 additions & 2 deletions hts.c
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ char *hts_format_description(const htsFormat *format)
return ks_release(&str);
}

htsFile *hts_open_format(const char *fn, const char *mode, const htsFormat *fmt)
static htsFile *hts_open_format_impl(const char *fn, const char *mode, const htsFormat *fmt, hFILE* hf)
{
char smode[102], *cp, *cp2, *mode_c;
htsFile *fp = NULL;
Expand Down Expand Up @@ -420,7 +420,7 @@ htsFile *hts_open_format(const char *fn, const char *mode, const htsFormat *fmt)
if (fmt && fmt->format != unknown_format)
*mode_c = "\0g\0\0b\0c\0\0b\0g\0\0"[fmt->format];

hfile = hopen(fn, smode);
hfile = hf ? hf : hopen(fn, smode);
if (hfile == NULL) goto error;

fp = hts_hopen(hfile, fn, smode);
Expand All @@ -440,6 +440,17 @@ htsFile *hts_open_format(const char *fn, const char *mode, const htsFormat *fmt)

return NULL;
}
htsFile *hts_open_format(const char *fn, const char *mode, const htsFormat *fmt)
{
return hts_open_format_impl(fn, mode, fmt, NULL);
}

htsFile *hts_open_callback(const char* fn, hFILE_callback_ops* ops, const char* mode)
{
if(NULL == ops) return NULL;
hFILE* fp = hopen_callback(*ops, mode);
return hts_open_format_impl(fn ? fn : "-", mode, NULL, fp);
}

htsFile *hts_open(const char *fn, const char *mode) {
return hts_open_format(fn, mode, NULL);
Expand Down
15 changes: 15 additions & 0 deletions htslib/hfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ typedef struct hFILE {
// @endcond
} hFILE;

/// Defines the operations that used by an IO file
typedef struct hFILE_callback_ops {
void* cb_data;
ssize_t (*read)(void* cb_data, void* buf, size_t sz);
ssize_t (*write)(void* cb_data, const void* buf, size_t sz);
off_t (*seek)(void* cb_data, off_t ofs, int whence);
int (*flush)(void* cb_data);
int (*close)(void* cb_data);
} hFILE_callback_ops;

/// Open the named file or URL as a stream
/** @return An hFILE pointer, or `NULL` (with _errno_ set) if an error occurred.

Expand All @@ -63,6 +73,11 @@ The usual `fopen(3)` _mode_ letters are supported: one of
*/
hFILE *hopen(const char *filename, const char *mode, ...) HTS_RESULT_USED;

/// Wrap a group of operation callbacks into a HFile stream
/** @return An hFILE pointer, or NULL if error occurred
*/
hFILE *hopen_callback(hFILE_callback_ops ops, const char* mode) HTS_RESULT_USED;

/// Associate a stream with an existing open file descriptor
/** @return An hFILE pointer, or `NULL` (with _errno_ set) if an error occurred.

Expand Down
12 changes: 12 additions & 0 deletions htslib/hts.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ typedef struct BGZF BGZF;
#endif
struct cram_fd;
struct hFILE;
struct hFILE_callback_ops;
struct hts_tpool;

#ifndef KSTRING_T
Expand Down Expand Up @@ -370,6 +371,17 @@ char *hts_format_description(const htsFormat *format);
*/
htsFile *hts_open(const char *fn, const char *mode);

/*!
@abstract Open a SAM/BAM/CRAM/VCF/BCF/etc from stream callbacks
@param ops The IO operation callback
@param mode The mode string
@discussion
See hts_open for description of mode string.
This function is useful when data sources other than file/pipe
should be consumed.
*/
htsFile *hts_open_callback(const char* fn, struct hFILE_callback_ops* ops, const char* mode);

/*!
@abstract Open a SAM/BAM/CRAM/VCF/BCF/etc file
@param fn The file name or "-" for stdin/stdout
Expand Down
58 changes: 58 additions & 0 deletions test/hfile.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ DEALINGS IN THE SOFTWARE. */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>

#include <sys/stat.h>

Expand Down Expand Up @@ -92,6 +94,54 @@ void reopen(const char *infname, const char *outfname)
if (fout == NULL) fail("hopen(\"%s\")", outfname);
}

ssize_t _callback_read(void* cb_data, void* buf, size_t sz)
{
return read(*(int*)cb_data, buf, sz);
}
ssize_t _callback_write(void* cb_data, const void* buf, size_t sz)
{
return write(*(int*)cb_data, buf, sz);
}
off_t _callback_seek(void* cb_data, off_t ofs, int whence)
{
return lseek(*(int*)cb_data, ofs, whence);
}
int _callback_close(void* cb_data)
{
int fd = *(int*)cb_data;
free(cb_data);
return close(fd);
}


void reopen_callback(const char* infname, const char* outfilename)
{
hFILE_callback_ops read_callback = {
.cb_data = malloc(sizeof(int)),
.read = _callback_read,
.seek = _callback_seek,
.close = _callback_close
};

hFILE_callback_ops write_callback = {
.cb_data = malloc(sizeof(int)),
.write = _callback_write,
.close = _callback_close
};

if (fin) { if (hclose(fin) != 0) fail("hclose(input)"); }
if (fout) { if (hclose(fout) != 0) fail("hclose(output)"); }

*(int*)read_callback.cb_data = open(infname, O_RDONLY);
*(int*)write_callback.cb_data = open(outfilename, O_WRONLY);

fin = hopen_callback(read_callback, "r");
if (fin == NULL) fail("hopen(\"%s\")", infname);

fout = hopen_callback(write_callback, "w");
if (fout == NULL) fail("hopen(\"%s\")", outfilename);
}

int main(void)
{
static const int size[] = { 1, 13, 403, 999, 30000 };
Expand All @@ -108,6 +158,14 @@ int main(void)
}
if (herrno(fin)) { errno = herrno(fin); fail("hgetc"); }

reopen_callback("vcf.c", "test/hfile1.tmp");
while ((c = hgetc(fin)) != EOF) {
if (hputc(c, fout) == EOF) fail("hputc");
}
if (herrno(fin)) { errno = herrno(fin); fail("callback hgetc"); }

if(hseek(fin, SEEK_SET, 0) < 0) fail("callback seek");

reopen("test/hfile1.tmp", "test/hfile2.tmp");
if (hpeek(fin, buffer, 50) < 0) fail("hpeek");
while ((n = hread(fin, buffer, 17)) > 0) {
Expand Down