From 4c16d3c23dc71d5abebe6ade3cd6db9f3a76e7ca Mon Sep 17 00:00:00 2001 From: "Density 21.5" Date: Wed, 23 Oct 2024 14:54:02 +0200 Subject: [PATCH] more selective merge_with call to avoid contention --- src/local_array/store/macros.rs | 95 +++----------------------------- src/local_array/store/oncebox.rs | 18 ++++-- 2 files changed, 20 insertions(+), 93 deletions(-) diff --git a/src/local_array/store/macros.rs b/src/local_array/store/macros.rs index dd00c9d..5f67e70 100644 --- a/src/local_array/store/macros.rs +++ b/src/local_array/store/macros.rs @@ -171,15 +171,9 @@ macro_rules! retrieve_node_mut_with_guard_closure { let this_level = >::len_to_store_bits($id.get_id().1, level); let next_level = >::len_to_store_bits($id.get_id().1, level + 1); let node_set = NodeSet::init(next_level - this_level); - // } else { - // NodeSet( - // Box::new([]), - // std::sync::RwLock::new(RoaringBitmap::new()) - // ) - // }; // See if we can create the node - let node = nodes.0.get_or_init(index, || StoredNode { + let (node, _its_us) = nodes.0.get_or_init(index, || StoredNode { node_id: $id, node: TreeBitMapNode { ptrbitarr: <$stride as Stride>::AtomicPtrSize::from(0), @@ -217,8 +211,8 @@ macro_rules! retrieve_node_mut_with_guard_closure { } }, Some(this_node) => { - let StoredNode { node_id, node, node_set } = this_node; - if $id == *node_id { + // let StoredNode { node_id, node, node_set } = this_node; + if $id == this_node.node_id { // YES, It's the one we're looking for! // Update the rbm_index in this node with the @@ -228,13 +222,13 @@ macro_rules! retrieve_node_mut_with_guard_closure { // to, does not need to be written to, it's part // of a trie, so it just needs to "exist" (and it // already does). - let retry_count = node_set.update_rbm_index( + let retry_count = this_node.node_set.update_rbm_index( $multi_uniq_id ).ok(); trace!("Retry_count rbm index {:?}", retry_count); - trace!("add multi uniq id to bitmap index {} for node {}", $multi_uniq_id, node); - return Some(SizedStrideRef::$stride(node)); + trace!("add multi uniq id to bitmap index {} for node {}", $multi_uniq_id, this_node.node); + return Some(SizedStrideRef::$stride(&this_node.node)); }; // Meh, it's not, but we can a go to the next level // and see if it lives there. @@ -244,7 +238,7 @@ macro_rules! retrieve_node_mut_with_guard_closure { next_bit_shift if next_bit_shift > 0 => { (search_level.f)( search_level, - &node_set, + &this_node.node_set, level, // guard, ) @@ -280,21 +274,16 @@ macro_rules! store_node_closure { multi_uniq_id: u32, mut level: u8, mut retry_count: u32| { - // println!("-"); let this_level = >::len_to_store_bits($id.get_id().1, level); trace!("{:032b}", $id.get_id().0); trace!("id {:?}", $id.get_id()); trace!("multi_uniq_id {}", multi_uniq_id); - std::sync::atomic::fence(Ordering::Acquire); // HASHING FUNCTION let index = Self::hash_node_id($id, level); let stored_nodes = &nodes.0; //.load(Ordering::Acquire, $guard); - // assert!(!stored_nodes.is_null()); let node_ref = &stored_nodes; // .get(index); - // println!("success"); - // let stored_node = node_ref.load(Ordering::Acquire, $guard); match node_ref.get(index) { None => { @@ -315,42 +304,12 @@ macro_rules! store_node_closure { trace!("multi uniq id {}", multi_uniq_id); - // let node_set = if next_level > 0 { let node_set = NodeSet::init(next_level - this_level); - // } else { - // NodeSet( - // Box::new([]), - // std::sync::RwLock::new(RoaringBitmap::new()) - // ) - // }; - - // Update the rbm_index in this node with the - // multi_uniq_id that the caller specified. We're - // doing this independently from setting the - // NodeSet atomically. If we would have done this - // in one Atomic CAS operation we would have to - // clone the NodeSet. Now we only have to clone - // the rbm_index itself. Furthermore, these - // operations can be (semi-)independent. Two - // out-of-order things can happen: - // 1. The rbm_index storing and the NodeSet - // storing get interjected with rbm_index value - // from another thread. In that case the whole - // NodeSet storing operation fails and is retried - // with a newly acquired value for both the - // rbm_index and the NodeSet. - // 2. The rmb_index storing operation succeeds, - // but the NodeSet storing operation fails, - // because the contention retries hit the - // threshold. In that case a false positive is - // stored in the index, which leads to more - // in-vain searching, but not to data corruption. - // retry_count += node_set.update_rbm_index(multi_uniq_id)?; let ptrbitarr = new_node.ptrbitarr.load(); let pfxbitarr = new_node.pfxbitarr.load(); - let stored_node = node_ref.get_or_init( + let (stored_node, its_us) = node_ref.get_or_init( index, || StoredNode { node_id: $id, @@ -374,44 +333,6 @@ macro_rules! store_node_closure { } return Ok(($id, retry_count)); - - // match node_ref.compare_exchange( - // Shared::null(), - // Owned::new(StoredNode { - // node_id: $id, - // node: new_node, - // node_set, - // }), - // Ordering::Acquire, - // Ordering::Relaxed, - // $guard - // ) { - // Ok(_pfx) => { - - // if log_enabled!(log::Level::Trace) { - // trace!("Created node {}", $id); - // } - // return Ok(($id, retry_count)); - // }, - // Err(crossbeam_epoch::CompareExchangeError { new, .. }) => { - // retry_count +=1 ; - - // if log_enabled!(log::Level::Trace) { - // trace!("Failed to create node {}. Someone is busy creating it",$id); - // } - - // let StoredNode { node: cur_node,.. } = *new.into_box(); - // $back_off.spin(); - // return (search_level.f)( - // search_level, - // nodes, - // cur_node, - // multi_uniq_id, - // level, - // retry_count - // ); - // } - // }; } Some(stored_node) => { // A node exists, might be ours, might be diff --git a/src/local_array/store/oncebox.rs b/src/local_array/store/oncebox.rs index 160fffa..93bf12b 100644 --- a/src/local_array/store/oncebox.rs +++ b/src/local_array/store/oncebox.rs @@ -34,8 +34,8 @@ impl OnceBox { let res = match self.ptr.compare_exchange( null_mut(), ptr, - Ordering::SeqCst, - Ordering::SeqCst, + Ordering::Acquire, + Ordering::Relaxed, ) { Ok(current) => { // We set the new value, return it. @@ -53,9 +53,10 @@ impl OnceBox { (unsafe { &*res }, its_us) } - pub fn get_or_init(&self, create: impl FnOnce() -> T) -> &T { + pub fn get_or_init(&self, create: impl FnOnce() -> T) -> (&T, bool) { + let mut its_us = false; if let Some(res) = self.get() { - return res; + return (res, its_us); } let ptr = Box::leak(Box::new(create())); let res = match self.ptr.compare_exchange( @@ -67,6 +68,7 @@ impl OnceBox { Ok(current) => { // We set the new value, return it. assert!(current.is_null()); + its_us = true; ptr as *const _ } Err(current) => { @@ -76,7 +78,7 @@ impl OnceBox { current as *const _ } }; - unsafe { &*res } + (unsafe { &*res }, its_us) } } @@ -118,7 +120,11 @@ impl OnceBoxSlice { } } - pub fn get_or_init(&self, idx: usize, create: impl FnOnce() -> T) -> &T { + pub fn get_or_init( + &self, + idx: usize, + create: impl FnOnce() -> T, + ) -> (&T, bool) { let slice = self.get_or_make_slice(); slice[idx].get_or_init(create) }