Skip to content

Commit

Permalink
more selective merge_with call to avoid contention
Browse files Browse the repository at this point in the history
  • Loading branch information
density215 committed Oct 23, 2024
1 parent 8e25148 commit 4c16d3c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 93 deletions.
95 changes: 8 additions & 87 deletions src/local_array/store/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,9 @@ macro_rules! retrieve_node_mut_with_guard_closure {
let this_level = <NB as NodeBuckets<AF>>::len_to_store_bits($id.get_id().1, level);
let next_level = <NB as NodeBuckets<AF>>::len_to_store_bits($id.get_id().1, level + 1);
let node_set = NodeSet::init(next_level - this_level);
// } else {
// NodeSet(
// Box::new([]),
// std::sync::RwLock::new(RoaringBitmap::new())
// )
// };

// See if we can create the node
let node = nodes.0.get_or_init(index, || StoredNode {
let (node, _its_us) = nodes.0.get_or_init(index, || StoredNode {
node_id: $id,
node: TreeBitMapNode {
ptrbitarr: <$stride as Stride>::AtomicPtrSize::from(0),
Expand Down Expand Up @@ -217,8 +211,8 @@ macro_rules! retrieve_node_mut_with_guard_closure {
}
},
Some(this_node) => {
let StoredNode { node_id, node, node_set } = this_node;
if $id == *node_id {
// let StoredNode { node_id, node, node_set } = this_node;
if $id == this_node.node_id {
// YES, It's the one we're looking for!

// Update the rbm_index in this node with the
Expand All @@ -228,13 +222,13 @@ macro_rules! retrieve_node_mut_with_guard_closure {
// to, does not need to be written to, it's part
// of a trie, so it just needs to "exist" (and it
// already does).
let retry_count = node_set.update_rbm_index(
let retry_count = this_node.node_set.update_rbm_index(
$multi_uniq_id
).ok();

trace!("Retry_count rbm index {:?}", retry_count);
trace!("add multi uniq id to bitmap index {} for node {}", $multi_uniq_id, node);
return Some(SizedStrideRef::$stride(node));
trace!("add multi uniq id to bitmap index {} for node {}", $multi_uniq_id, this_node.node);
return Some(SizedStrideRef::$stride(&this_node.node));
};
// Meh, it's not, but we can a go to the next level
// and see if it lives there.
Expand All @@ -244,7 +238,7 @@ macro_rules! retrieve_node_mut_with_guard_closure {
next_bit_shift if next_bit_shift > 0 => {
(search_level.f)(
search_level,
&node_set,
&this_node.node_set,
level,
// guard,
)
Expand Down Expand Up @@ -280,21 +274,16 @@ macro_rules! store_node_closure {
multi_uniq_id: u32,
mut level: u8,
mut retry_count: u32| {
// println!("-");
let this_level = <NB as NodeBuckets<AF>>::len_to_store_bits($id.get_id().1, level);
trace!("{:032b}", $id.get_id().0);
trace!("id {:?}", $id.get_id());
trace!("multi_uniq_id {}", multi_uniq_id);

std::sync::atomic::fence(Ordering::Acquire);
// HASHING FUNCTION
let index = Self::hash_node_id($id, level);
let stored_nodes = &nodes.0; //.load(Ordering::Acquire, $guard);

// assert!(!stored_nodes.is_null());
let node_ref = &stored_nodes; // .get(index);
// println!("success");
// let stored_node = node_ref.load(Ordering::Acquire, $guard);

match node_ref.get(index) {
None => {
Expand All @@ -315,42 +304,12 @@ macro_rules! store_node_closure {

trace!("multi uniq id {}", multi_uniq_id);

// let node_set = if next_level > 0 {
let node_set = NodeSet::init(next_level - this_level);
// } else {
// NodeSet(
// Box::new([]),
// std::sync::RwLock::new(RoaringBitmap::new())
// )
// };

// Update the rbm_index in this node with the
// multi_uniq_id that the caller specified. We're
// doing this independently from setting the
// NodeSet atomically. If we would have done this
// in one Atomic CAS operation we would have to
// clone the NodeSet. Now we only have to clone
// the rbm_index itself. Furthermore, these
// operations can be (semi-)independent. Two
// out-of-order things can happen:
// 1. The rbm_index storing and the NodeSet
// storing get interjected with rbm_index value
// from another thread. In that case the whole
// NodeSet storing operation fails and is retried
// with a newly acquired value for both the
// rbm_index and the NodeSet.
// 2. The rmb_index storing operation succeeds,
// but the NodeSet storing operation fails,
// because the contention retries hit the
// threshold. In that case a false positive is
// stored in the index, which leads to more
// in-vain searching, but not to data corruption.
// retry_count += node_set.update_rbm_index(multi_uniq_id)?;

let ptrbitarr = new_node.ptrbitarr.load();
let pfxbitarr = new_node.pfxbitarr.load();

let stored_node = node_ref.get_or_init(
let (stored_node, its_us) = node_ref.get_or_init(
index,
|| StoredNode {
node_id: $id,
Expand All @@ -374,44 +333,6 @@ macro_rules! store_node_closure {
}

return Ok(($id, retry_count));

// match node_ref.compare_exchange(
// Shared::null(),
// Owned::new(StoredNode {
// node_id: $id,
// node: new_node,
// node_set,
// }),
// Ordering::Acquire,
// Ordering::Relaxed,
// $guard
// ) {
// Ok(_pfx) => {

// if log_enabled!(log::Level::Trace) {
// trace!("Created node {}", $id);
// }
// return Ok(($id, retry_count));
// },
// Err(crossbeam_epoch::CompareExchangeError { new, .. }) => {
// retry_count +=1 ;

// if log_enabled!(log::Level::Trace) {
// trace!("Failed to create node {}. Someone is busy creating it",$id);
// }

// let StoredNode { node: cur_node,.. } = *new.into_box();
// $back_off.spin();
// return (search_level.f)(
// search_level,
// nodes,
// cur_node,
// multi_uniq_id,
// level,
// retry_count
// );
// }
// };
}
Some(stored_node) => {
// A node exists, might be ours, might be
Expand Down
18 changes: 12 additions & 6 deletions src/local_array/store/oncebox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl<T> OnceBox<T> {
let res = match self.ptr.compare_exchange(
null_mut(),
ptr,
Ordering::SeqCst,
Ordering::SeqCst,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(current) => {
// We set the new value, return it.
Expand All @@ -53,9 +53,10 @@ impl<T> OnceBox<T> {
(unsafe { &*res }, its_us)
}

pub fn get_or_init(&self, create: impl FnOnce() -> T) -> &T {
pub fn get_or_init(&self, create: impl FnOnce() -> T) -> (&T, bool) {
let mut its_us = false;
if let Some(res) = self.get() {
return res;
return (res, its_us);
}
let ptr = Box::leak(Box::new(create()));
let res = match self.ptr.compare_exchange(
Expand All @@ -67,6 +68,7 @@ impl<T> OnceBox<T> {
Ok(current) => {
// We set the new value, return it.
assert!(current.is_null());
its_us = true;
ptr as *const _
}
Err(current) => {
Expand All @@ -76,7 +78,7 @@ impl<T> OnceBox<T> {
current as *const _
}
};
unsafe { &*res }
(unsafe { &*res }, its_us)
}
}

Expand Down Expand Up @@ -118,7 +120,11 @@ impl<T> OnceBoxSlice<T> {
}
}

pub fn get_or_init(&self, idx: usize, create: impl FnOnce() -> T) -> &T {
pub fn get_or_init(
&self,
idx: usize,
create: impl FnOnce() -> T,
) -> (&T, bool) {
let slice = self.get_or_make_slice();
slice[idx].get_or_init(create)
}
Expand Down

0 comments on commit 4c16d3c

Please sign in to comment.