Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6ef3897
restructure test organization
wngr Mar 16, 2021
d989eaf
add non-recursive tree traversal
wngr Mar 16, 2021
245f3b2
replace iter_filtered_chunked0 with traverse0
wngr Mar 16, 2021
455185c
change stream_filtered_chunked to use traverse
wngr Mar 16, 2021
a105154
add DoubleEndedIter impl for ForestIter
wngr Mar 16, 2021
86a2f8b
add some more iter based convenience methods and cleanup
wngr Mar 16, 2021
da5b67b
honor offset passed into reverse traversal
wngr Mar 16, 2021
c91803c
dry forward and backward iteration
wngr Mar 16, 2021
02dde24
dont expose offset for reverse traversal
wngr Mar 17, 2021
6fb7563
add test comparing forward and reverse case
wngr Mar 17, 2021
2fea7fa
dont expose offset passing at all
wngr Mar 17, 2021
cab7d66
add another test
wngr Mar 17, 2021
99af728
use a common stack for both index and positions and try to dry up the
wngr Mar 17, 2021
05cc8fe
Add test for traversal of very deep trees
rklaehn Mar 17, 2021
8def1b9
Remove the scary 'outer
rklaehn Mar 17, 2021
276179f
Use empty filter as a marker that it still has to be initialized
rklaehn Mar 17, 2021
bf39759
Format new test
rklaehn Mar 17, 2021
9b5f2c5
clippy the tests
rklaehn Mar 17, 2021
3838ac5
Merge pull request #59 from Actyx/rkl/traversal-improvements
wngr Mar 17, 2021
8b871a5
remove DoubleEndedIterator impl
wngr Mar 17, 2021
ca2ebce
create non-empty range and dont reverse data items inside a chunk
wngr Mar 17, 2021
97fae3c
WIP
rklaehn Mar 17, 2021
e09ae96
Try using isize for position, so that it can be directly the position…
rklaehn Mar 17, 2021
f2ebf2a
Add comment about ordering of data in FilteredChunk
rklaehn Mar 18, 2021
e1b2c22
pr comments by @rkuhn
wngr Mar 18, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
*sqlite
target
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion banyan-utils/.gitignore

This file was deleted.

1 change: 1 addition & 0 deletions banyan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ lru = "0.6.4"
maplit = "1.0.2"
rand = "0.8.3"
salsa20 = "0.7.2"
smallvec = "1.6.1"
tracing = "0.1.23"
zstd = "0.6.0"

Expand Down
2 changes: 2 additions & 0 deletions banyan/src/forest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ mod read;
mod stream;
mod write;

pub use read::ForestIter;

pub type FutureResult<'a, T> = BoxFuture<'a, Result<T>>;

#[derive(Debug, Clone)]
Expand Down
274 changes: 151 additions & 123 deletions banyan/src/forest/read.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
use super::{BranchCache, Config, CryptoConfig, FilteredChunk, Forest, TreeTypes};
use crate::{
index::deserialize_compressed,
index::zip_with_offset,
index::Branch,
index::BranchIndex,
index::CompactSeq,
index::Index,
index::IndexRef,
index::Leaf,
index::LeafIndex,
index::NodeInfo,
index::{
deserialize_compressed, zip_with_offset, Branch, BranchIndex, CompactSeq, Index, IndexRef,
Leaf, LeafIndex, NodeInfo,
},
query::Query,
store::ReadOnlyStore,
util::{BoxedIter, IterExt},
Expand All @@ -19,7 +13,148 @@ use anyhow::{anyhow, Result};
use core::fmt::Debug;
use futures::{prelude::*, stream::BoxStream};
use libipld::cbor::DagCbor;
use smallvec::{smallvec, SmallVec};

use std::{iter, sync::Arc, time::Instant};
pub struct ForestIter<T: TreeTypes, V, R, Q: Query<T>, F> {
pub forest: Forest<T, V, R>,
pub offset: u64,
pub query: Q,
pub mk_extra: F,
pub index_stack: SmallVec<[Arc<Index<T>>; 64]>,
pub pos_stack: SmallVec<[(usize, SmallVec<[bool; 64]>); 32]>,
}

impl<T: TreeTypes, V, R, Q, E, F> Iterator for ForestIter<T, V, R, Q, F>
where
T: TreeTypes + 'static,
V: DagCbor + Clone + Send + Sync + Debug + 'static,
R: ReadOnlyStore<T::Link> + Clone + Send + Sync + 'static,
Q: Query<T> + Clone + Send + 'static,
E: Send + 'static,
F: Fn(IndexRef<T>) -> E + Send + Sync + 'static,
{
type Item = Result<FilteredChunk<T, V, E>>;

fn next(&mut self) -> Option<Self::Item> {
let res: FilteredChunk<T, V, E> = 'outer: loop {
let head = match self.index_stack.last() {
Some(i) => i,
// Nothing to do ..
_ => return None,
};

let (pos, matching) = self.pos_stack.last_mut().expect("not empty");

// Branch is exhausted: Ascend.
if *pos == matching.len() {
// tracing::trace!("branch exhausted");

// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();

// increase last stack ptr, if there is still something left to
// traverse
if !self.index_stack.is_empty() {
let last = self.pos_stack.last_mut().expect("not empty");
last.0 += 1;
}
continue;
}

match self.forest.load_node(head) {
Ok(NodeInfo::Branch(index, branch)) => {
// we hit this branch node for the first time. Apply the
// query on its children and store it
if *pos == 0 {
let mut q_matching = smallvec![true; index.summaries.len()];
self.query.intersecting(self.offset, index, &mut q_matching);
debug_assert_eq!(branch.children.len(), q_matching.len());
let _ = std::mem::replace(matching, q_matching);
}

while !matching[*pos] {
self.offset += branch.children[*pos].count();
*pos += 1;

// Branch exhausted, ascend and continue
if *pos == branch.children.len() {
// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("not empty");
last.0 += 1;
continue 'outer;
}
}

// Descend into next child
self.index_stack
// TODO: clone :-( ?
.push(Arc::new(branch.children[*pos].clone()));
let new_vec: SmallVec<[_; 64]> = smallvec![matching[*pos]];
self.pos_stack.push((0, new_vec));
continue;
}

Ok(NodeInfo::Leaf(index, leaf)) => {
let chunk = {
let mut matching: SmallVec<[_; 32]> = smallvec![true; index.keys.len()];
self.query.containing(self.offset, index, &mut matching);
let keys = index.select_keys(&matching);
let elems: Vec<V> = match leaf.as_ref().select(&matching) {
Ok(i) => i,
Err(e) => return Some(Err(e)),
};
let pairs = keys
.zip(elems)
.map(|((o, k), v)| (o + self.offset, k, v))
.collect::<Vec<_>>();
FilteredChunk {
range: self.offset..self.offset + index.keys.count(),
data: pairs,
extra: (self.mk_extra)(IndexRef::Leaf(index)),
}
};
self.offset += index.keys.count();

// Ascend to parent's node
self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("not empty");
last.0 += 1;

break chunk;
}

// even for purged leafs and branches or ignored chunks,
// produce a placeholder.
//
// the caller can find out if we skipped purged parts of the
// tree by using an appropriate mk_extra fn, or check
// `data.len()`.
Ok(_) => {
// Ascend to parent's node
let index = self.index_stack.pop().expect("not empty");
self.pos_stack.pop();
let last = self.pos_stack.last_mut().expect("Index stack not empty");
last.0 += 1;
self.offset += index.count();

let placeholder: FilteredChunk<T, V, E> = FilteredChunk {
range: self.offset..self.offset + index.count(),
data: Vec::new(),
extra: (self.mk_extra)(index.as_index_ref()),
};
break placeholder;
}
Err(e) => return Some(Err(e)),
};
};
Some(Ok(res))
}
}

/// basic random access append only tree
impl<T, V, R> Forest<T, V, R>
Expand Down Expand Up @@ -305,8 +440,7 @@ where
query: Q,
index: Arc<Index<T>>,
) -> BoxedIter<'static, Result<(u64, T::Key, V)>> {
self.clone()
.iter_filtered_chunked0(offset, query, index, &|_| {})
self.traverse0(offset, query, index, &|_| {})
.map(|res| match res {
Ok(chunk) => chunk.data.into_iter().map(Ok).left_iter(),
Err(cause) => iter::once(Err(cause)).right_iter(),
Expand All @@ -315,84 +449,6 @@ where
.boxed()
}

pub(crate) fn iter_filtered_chunked0<
Q: Query<T> + Clone + Send + 'static,
E: Send + 'static,
F: Fn(IndexRef<T>) -> E + Send + Sync + 'static,
>(
&self,
offset: u64,
query: Q,
index: Arc<Index<T>>,
mk_extra: &'static F,
) -> BoxedIter<'static, Result<FilteredChunk<T, V, E>>> {
let this = self.clone();
let inner = || {
Ok(match self.load_node(&index)? {
NodeInfo::Leaf(index, node) => {
// todo: don't get the node here, since we might not need it
let mut matching = vec![true; index.keys.len()];
query.containing(offset, index, &mut matching);
let keys = index.select_keys(&matching);
let elems: Vec<V> = node.as_ref().select(&matching)?;
let pairs = keys
.zip(elems)
.map(|((o, k), v)| (o + offset, k, v))
.collect::<Vec<_>>();
let chunk = FilteredChunk {
range: offset..offset + index.keys.count(),
data: pairs,
extra: mk_extra(IndexRef::Leaf(index)),
};
iter::once(Ok(chunk)).left_iter().left_iter()
}
NodeInfo::Branch(index, node) => {
// todo: don't get the node here, since we might not need it
let mut matching = vec![true; index.summaries.len()];
query.intersecting(offset, index, &mut matching);
let offsets = zip_with_offset(node.children.to_vec(), offset);
let iter = matching.into_iter().zip(offsets).map(
move |(is_matching, (child, offset))| {
if is_matching {
this.iter_filtered_chunked0(
offset,
query.clone(),
Arc::new(child),
mk_extra,
)
.left_iter()
} else {
let placeholder = FilteredChunk {
range: offset..offset + child.count(),
data: Vec::new(),
extra: mk_extra(child.as_index_ref()),
};
iter::once(Ok(placeholder)).right_iter()
}
},
);
iter.flatten().right_iter().left_iter()
}
NodeInfo::PurgedBranch(_) | NodeInfo::PurgedLeaf(_) => {
// even for purged leafs and branches, produce a placeholder.
//
// the caller can find out if we skipped purged parts of the tree by
// using an appropriate mk_extra fn.
let placeholder = FilteredChunk {
range: offset..offset + index.count(),
data: Vec::new(),
extra: mk_extra(index.as_index_ref()),
};
iter::once(Ok(placeholder)).right_iter()
}
})
};
match inner() {
Ok(iter) => iter.boxed(),
Err(cause) => iter::once(Err(cause)).boxed(),
}
}

pub(crate) fn stream_filtered_chunked_reverse0<
Q: Query<T> + Clone + Send + 'static,
E: Send + 'static,
Expand Down Expand Up @@ -475,53 +531,25 @@ where

pub(crate) fn dump0(&self, index: &Index<T>, prefix: &str) -> Result<()> {
match self.load_node(index)? {
NodeInfo::Leaf(index, _) => {
println!(
"{}Leaf(count={}, value_bytes={}, sealed={}, link={})",
prefix,
index.keys.count(),
index.value_bytes,
index.sealed,
index
.link
.map(|x| format!("{}", x))
.unwrap_or_else(|| "".to_string())
);
}

NodeInfo::Branch(index, branch) => {
println!(
"{}Branch(count={}, key_bytes={}, value_bytes={}, sealed={}, link={})",
prefix,
"Branch(count={}, key_bytes={}, value_bytes={}, sealed={}, link={}, children={})",
index.count,
index.key_bytes,
index.value_bytes,
index.sealed,
index
.link
.map(|x| format!("{}", x))
.unwrap_or_else(|| "".to_string())
);
.unwrap_or_else(|| "".to_string()),
branch.children.len()
);
let prefix = prefix.to_string() + " ";
for x in branch.children.iter() {
self.dump0(x, &prefix)?;
}
}
NodeInfo::PurgedBranch(index) => {
println!(
"{}PurgedBranch(count={}, key_bytes={}, value_bytes={}, sealed={})",
prefix, index.count, index.key_bytes, index.value_bytes, index.sealed,
);
}
NodeInfo::PurgedLeaf(index) => {
println!(
"{}PurgedLeaf(count={}, key_bytes={}, sealed={})",
prefix,
index.keys.count(),
index.value_bytes,
index.sealed,
);
}
x => println!("{}{}", prefix, x),
};
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion banyan/src/forest/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<
let offset = offset.clone();
let iter = forest
.clone()
.iter_filtered_chunked0(0, query, index, mk_extra)
.traverse0(0, query, index, mk_extra)
.take_while(move |result| {
if let Ok(chunk) = result {
// update the offset
Expand Down
Loading