Skip to content

Commit

Permalink
minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
sklose committed Apr 15, 2023
1 parent 1ba3308 commit 5b280ab
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
**/*.rs.bk
**/*.rs.bk
/.idea
4 changes: 2 additions & 2 deletions benches/u64_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn disrustor_channel<S: Sequencer, F: FnOnce(&RingBuffer<i64>) -> S>(n: u64, b:
let gating_sequence = vec![sequencer.get_cursor()];
let barrier = sequencer.create_barrier(&gating_sequence);
let processor = BatchEventProcessor::create(move |data, sequence, _| {
assert!(*data == sequence);
assert_eq!(*data, sequence);
});

sequencer.add_gating_sequence(&processor.get_cursor());
Expand All @@ -57,7 +57,7 @@ fn disrustor_channel<S: Sequencer, F: FnOnce(&RingBuffer<i64>) -> S>(n: u64, b:

sequencer.drain();
handle.join();
assert!(counter == n);
assert_eq!(counter, n);
}

fn criterion_benchmark(c: &mut Criterion) {
Expand Down
11 changes: 7 additions & 4 deletions src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::{
atomic::{AtomicI64, Ordering},
Arc,
use std::{
borrow::Borrow,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
};

pub type Sequence = i64;
Expand Down Expand Up @@ -68,7 +71,7 @@ pub trait Sequencer {

pub trait WaitStrategy: Send + Sync {
fn new() -> Self;
fn wait_for<F: Fn() -> bool, S: AsRef<AtomicSequence>>(
fn wait_for<F: Fn() -> bool, S: Borrow<AtomicSequence>>(
&self,
sequence: Sequence,
dependencies: &[S],
Expand Down
8 changes: 4 additions & 4 deletions src/ringbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ impl<T: Default> RingBuffer<T> {
}

impl<T: Send + Sync> DataProvider<T> for RingBuffer<T> {
fn buffer_size(&self) -> usize {
self.capacity
}

unsafe fn get_mut(&self, sequence: Sequence) -> &mut T {
let index = sequence as usize & self.mask;
let cell = self.data.get_unchecked(index);
Expand All @@ -39,10 +43,6 @@ impl<T: Send + Sync> DataProvider<T> for RingBuffer<T> {
let cell = self.data.get_unchecked(index);
&*cell.get()
}

fn buffer_size(&self) -> usize {
self.capacity
}
}

unsafe impl<T: Send> Send for RingBuffer<T> {}
Expand Down
5 changes: 3 additions & 2 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::prelude::*;
use std::{
borrow::Borrow,
iter::*,
sync::atomic::{AtomicU64, Ordering},
};

pub fn min_cursor_sequence<S: AsRef<AtomicSequence>>(sequences: &[S]) -> Sequence {
pub fn min_cursor_sequence<S: Borrow<AtomicSequence>>(sequences: &[S]) -> Sequence {
sequences
.iter()
.map(|s| s.as_ref().get())
.map(|s| s.borrow().get())
.min()
.unwrap_or_default()
}
Expand Down
11 changes: 7 additions & 4 deletions src/wait.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::prelude::*;
use crate::utils::*;
use std::sync::{Condvar, Mutex};
use std::{
borrow::Borrow,
sync::{Condvar, Mutex},
};

pub struct SpinLoopWaitStrategy;

Expand All @@ -14,7 +17,7 @@ impl WaitStrategy for SpinLoopWaitStrategy {
SpinLoopWaitStrategy {}
}

fn wait_for<F: Fn() -> bool, S: AsRef<AtomicSequence>>(
fn wait_for<F: Fn() -> bool, S: Borrow<AtomicSequence>>(
&self,
sequence: Sequence,
dependencies: &[S],
Expand All @@ -36,13 +39,13 @@ impl WaitStrategy for SpinLoopWaitStrategy {

impl WaitStrategy for BlockingWaitStrategy {
fn new() -> Self {
BlockingWaitStrategy {
Self {
cvar: Condvar::new(),
guard: Mutex::new(()),
}
}

fn wait_for<F: Fn() -> bool, S: AsRef<AtomicSequence>>(
fn wait_for<F: Fn() -> bool, S: Borrow<AtomicSequence>>(
&self,
sequence: Sequence,
dependencies: &[S],
Expand Down

0 comments on commit 5b280ab

Please sign in to comment.