Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialiser improvements #42

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
607 changes: 360 additions & 247 deletions Cargo.lock

Large diffs are not rendered by default.

37 changes: 22 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ crate-type = ["rlib", "cdylib"]
[dependencies]
anyhow = "1"
async-stream = "0.3"
bincode = "1"
bincode = { version = "1", optional = true }
byte-slice-cast = "1"
byteorder = "1"
bytes = "1"
flexbuffers = "2"
flexbuffers = { version = "2", optional = true }
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
Expand All @@ -32,7 +32,6 @@ strength_reduce = "0.2"
zerocopy = "0.7"
libc = "0.2"
hdf5 = { package = "hdf5-metno", version = "0.9.1" }
hdf5-sys = { package = "hdf5-metno-sys", version = "0.9.1" }
log = "0.4"
rayon = "1.10"
ndarray = { version = "0.16", features = [ "rayon" ] }
Expand All @@ -41,34 +40,42 @@ ndarray_0_15 = { package = "ndarray", version = "0.15", features = ["rayon"] }
pyo3 = { version = "0.21", optional = true, features = ["anyhow", "auto-initialize", "abi3-py39"] }
numpy = { version = "0.21.0", optional = true }
netcdf = { version = "0.10.4", optional = true }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["sync", "macros", "rt-multi-thread"] }
clap = { version = "4.5.16", features = ["derive"], optional = true }

[dependencies.serde]
features = ["derive"]
version = "1"

[dependencies.tokio]
features = ["sync", "macros", "rt-multi-thread"]
version = "1"

[dev-dependencies]
divan = "0.1.14"
rand = "0.8"
sled = "0.34.7"
reqwest = { version = "0.12", features = [ "blocking" ] }
flexbuffers = "2"
bincode = "1"

[profile.release]
lto = 'thin'
codegen-units = 1
debug = true

[features]
default = ["static", "fast-index", "netcdf"]
static = ["hdf5-sys/static", "hdf5-sys/zlib", "netcdf?/static"]
fast-index = []
python = ["pyo3", "numpy"]
default = ["static", "netcdf"]
netcdf = ["dep:netcdf"]
static = ["hdf5/static", "hdf5/zlib", "netcdf?/static"]
python = ["dep:pyo3", "dep:numpy"]
extension-module = ["python", "pyo3/extension-module"]
netcdf = [ "dep:netcdf" ]
unstable = []
flexbuffers = ["dep:flexbuffers"]
clap = ["dep:clap"]
bincode = ["dep:bincode"]

[[bin]]
name = "hfxlst"
required-features = ["clap"]

[[bin]]
name = "hfxidx"
required-features = ["clap"]

[[bench]]
name = "concurrency"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies = [

[tool.maturin]
python-source = "python"
features = [ "static", "python", "extension-module", "fast-index" ]
features = [ "static", "python", "extension-module" ]

[tool.pytest.ini_options]
addopts = "--benchmark-disable"
Expand Down
59 changes: 35 additions & 24 deletions src/bin/hfxidx.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,50 @@
//! Create an index serialized to a flexbuffer.
use std::env;

#[macro_use]
extern crate anyhow;

use flexbuffers::FlexbufferSerializer as ser;
//! Create an index serialized to a file on disk.
use clap::Parser;
use hidefix::idx::Index;
use serde::ser::Serialize;

fn usage() {
println!("Usage: hfxidx input.h5 output.h5.idx");
#[derive(Parser, Debug)]
struct Args {
input: std::path::PathBuf,
output: std::path::PathBuf,
#[arg(default_value = "any", long)]
out_type: String,
}

fn main() -> Result<(), anyhow::Error> {
let args: Vec<String> = env::args().collect();

if args.len() != 3 {
usage();
return Err(anyhow!("Invalid arguments"));
}
let args = Args::parse();

let fin = &args[1];
let fout = &args[2];
let fin = &args.input;
let fout = &args.output;

print!("Indexing {fin}..");
print!("Indexing {}..", fin.to_string_lossy());

let idx = Index::index(fin)?;

println!("done.");

print!("Writing index to {fout} (as flxebuffer)..");

let mut s = ser::new();
idx.serialize(&mut s)?;
std::fs::write(fout, s.view())?;
println!(
"Writing index to {} (as {})..",
fout.to_string_lossy(),
args.out_type,
);

#[allow(unreachable_patterns)] // To let "any" work
match args.out_type.as_str() {
#[cfg(feature = "flexbuffers")]
"any" | "flexbuffers" => {
use serde::ser::Serialize;
let mut s = flexbuffers::FlexbufferSerializer::new();
idx.serialize(&mut s)?;
std::fs::write(fout, s.view())?;
}
#[cfg(feature = "bincode")]
"any" | "bincode" => {
let s = bincode::serialize(&idx)?;
std::fs::write(fout, s)?;
}
"any" => anyhow::bail!("No serializer compiled in"),
_ => anyhow::bail!("Unknown serialization type {}", args.out_type),
}

println!("done.");

Expand Down
80 changes: 49 additions & 31 deletions src/bin/hfxlst.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,62 @@
//! List a summary of a flexbuffer serialized index to stdout.
use std::env;
use clap::Parser;
use hidefix::idx::{DatasetExt, Index};

#[macro_use]
extern crate anyhow;

fn usage() {
println!("Usage: hfxlst input.h5.idx");
#[derive(Parser, Debug)]
struct Args {
input: std::path::PathBuf,
}

fn main() -> Result<(), anyhow::Error> {
let args: Vec<String> = env::args().collect();

if args.len() != 2 {
usage();
return Err(anyhow!("Invalid arguments"));
}
let args = Args::parse();

let fin = &args[1];
println!("Loading index from {}..", args.input.to_string_lossy());

println!("Loading index from {fin}..");
let b = std::fs::read(&args.input)?;
let idx = read_index(&b)?;

let b = std::fs::read(fin)?;
let idx = flexbuffers::Reader::get_root(&*b)?.as_map();
let path = idx.path();
if let Some(path) = path {
println!("Datasets (source path: {path:?})");
} else {
println!("Datasets");
}
for (name, dataset) in idx.datasets() {
let shape = dataset.shape();
println!("{name:32} {shape:?}");
}
// println!("{idx:?}");

println!("Datasets (source path: {:?}):\n", idx.idx("path").as_str());
println!("{:4}{:30} shape:", "", "name:");
Ok(())
}

let datasets = idx.idx("datasets").as_map();
fn read_index(bytes: &[u8]) -> Result<Index, anyhow::Error> {
let mut errs: Vec<anyhow::Error> = vec![];
#[cfg(feature = "flexbuffers")]
{
match read_flexbuffer(bytes) {
Ok(idx) => return Ok(idx),
Err(e) => errs.push(e),
}
}
#[cfg(feature = "bincode")]
{
match read_bincode(bytes) {
Ok(idx) => return Ok(idx),
Err(e) => errs.push(e),
}
}
anyhow::bail!("Parsing failed (incompatible, or not configured): {errs:?}")
}

datasets.iter_keys().for_each(|k| {
let shape: Vec<u64> = datasets
.idx(k)
.as_map()
.idx("shape")
.as_vector()
.iter()
.map(|r| r.as_u64())
.collect();
println!("{:4}{:30} {:?}", "", k, shape);
});
#[cfg(feature = "flexbuffers")]
fn read_flexbuffer(bytes: &[u8]) -> Result<Index, anyhow::Error> {
let idx = flexbuffers::from_slice(bytes)?;
Ok(idx)
}

Ok(())
#[cfg(feature = "bincode")]
fn read_bincode(bytes: &[u8]) -> Result<Index, anyhow::Error> {
let idx = bincode::deserialize(bytes)?;
Ok(idx)
}
4 changes: 2 additions & 2 deletions src/extent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ enum ExtentIterator<'a> {
Extents(std::iter::Zip<std::slice::Iter<'a, Extent>, std::slice::Iter<'a, u64>>),
}

impl<'a> Iterator for ExtentIterator<'a> {
impl Iterator for ExtentIterator<'_> {
type Item = Extent;
fn next(&mut self) -> Option<Self::Item> {
match self {
Expand All @@ -395,7 +395,7 @@ impl<'a> Iterator for ExtentIterator<'a> {
}
}

impl<'a> DoubleEndedIterator for ExtentIterator<'a> {
impl DoubleEndedIterator for ExtentIterator<'_> {
fn next_back(&mut self) -> Option<Self::Item> {
match self {
Self::All(iter) => iter
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/filters/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
/// processing.
///
/// Other implementations have often used Duff's device. Newer compilers, using structured data
/// types when possible, seems to result in similarily fast processing.
/// types when possible, seems to result in similarly fast processing.
pub fn unshuffle_structured<const N: usize>(src: &[u8], dest: &mut [u8]) {
assert!(src.len() == dest.len());
assert!(src.len() % N == 0);
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions src/idx/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl<const D: usize> Chunk<D> {
unsafe { &*(slice.as_ptr() as *const Chunk<D>) }
}

/// Reintepret a slice of `Chunk<D>`s to a slice of `u64`. This is efficient, but relies
/// Reinterpret a slice of `Chunk<D>`s to a slice of `u64`. This is efficient, but relies
/// on unsafe code.
pub fn slice_as_u64s(chunks: &[Chunk<D>]) -> &[ULE] {
let ptr = chunks.as_ptr();
Expand All @@ -122,7 +122,7 @@ impl<const D: usize> Chunk<D> {
slice
}

/// Reintepret a slice of `u64`s to a slice of `Chunk<D>`. This is efficient, but relies
/// Reinterpret a slice of `u64`s to a slice of `Chunk<D>`. This is efficient, but relies
/// on unsafe code.
pub fn slice_from_u64s(slice: &[ULE]) -> &[Chunk<D>] {
assert_eq!(
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/idx/dataset/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub trait DatasetExt {
fn as_par_reader(&self, p: &dyn AsRef<Path>) -> anyhow::Result<Box<dyn DatasetExtReader + '_>>;
}

impl<'a> DatasetExt for DatasetD<'a> {
impl DatasetExt for DatasetD<'_> {
fn size(&self) -> usize {
self.inner().size()
}
Expand Down
Loading
Loading