Skip to content

Commit

Permalink
initial work on multi producer
Browse files Browse the repository at this point in the history
  • Loading branch information
sklose committed Sep 8, 2019
1 parent b37a592 commit 22fefa5
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 9 deletions.
120 changes: 120 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in library 'disrustor'",
"cargo": {
"args": [
"test",
"--no-run",
"--lib",
"--package=disrustor"
],
"filter": {
"name": "disrustor",
"kind": "lib"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug example 'multi_producer'",
"cargo": {
"args": [
"build",
"--example=multi_producer",
"--package=disrustor"
],
"filter": {
"name": "multi_producer",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in example 'multi_producer'",
"cargo": {
"args": [
"test",
"--no-run",
"--example=multi_producer",
"--package=disrustor"
],
"filter": {
"name": "multi_producer",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug example 'single_producer'",
"cargo": {
"args": [
"build",
"--example=single_producer",
"--package=disrustor"
],
"filter": {
"name": "single_producer",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug unit tests in example 'single_producer'",
"cargo": {
"args": [
"test",
"--no-run",
"--example=single_producer",
"--package=disrustor"
],
"filter": {
"name": "single_producer",
"kind": "example"
}
},
"args": [],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug benchmark 'my_benchmark'",
"cargo": {
"args": [
"test",
"--no-run",
"--bench=my_benchmark",
"--package=disrustor"
],
"filter": {
"name": "my_benchmark",
"kind": "bench"
}
},
"args": [],
"cwd": "${workspaceFolder}"
}
]
}
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ azure-devops = { project = "sklosegithub/disrustor", pipeline = "sklose.disrusto
log = "0.4.8"

[dev-dependencies]
criterion = "0.2.11"
criterion = "0.3.0"
fern = "0.5.8"
chrono = "0.4.7"
chrono = "0.4.9"

[[bench]]
name = "my_benchmark"
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License

Copyright (c) 2010-2019 Google, Inc. http://angularjs.org
Copyright (c) 2019 Sebastian Klose

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
4 changes: 2 additions & 2 deletions benches/my_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn disrustor_channel<W: WaitStrategy + 'static>(n: i64, b: i64) {
counter += 1;
unsafe { *data.get_mut(sequence) = sequence };
}
sequencer.publish(end);
sequencer.publish(start, end);
}
}

Expand All @@ -68,7 +68,7 @@ fn criterion_benchmark(c: &mut Criterion) {
.with_function("disrustor blocking", move |b, i| {
b.iter(|| disrustor_channel::<BlockingWaitStrategy>(n, *i));
})
.throughput(move |_| Throughput::Elements(n as u32))
.throughput(move |_| Throughput::Elements(n as u64))
.sample_size(10),
);
}
Expand Down
96 changes: 96 additions & 0 deletions examples/multi_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use disrustor::{
internal::{BlockingWaitStrategy, SpinLoopWaitStrategy},
*,
};
use log::*;

const MAX: i64 = 200i64;

fn follow_sequence<W: WaitStrategy + 'static>() {
let (executor, producer) = DisrustorBuilder::with_ring_buffer(128)
.with_wait_strategy::<W>()
.with_multi_producer()
.with_barrier(|b| {
b.handle_events_mut(|data, sequence, _| {
let val = *data;
if val as i64 != sequence {
panic!(
"concurrency problem detected (p1), expected {}, but got {}",
sequence, val
);
}
debug!("updating sequence {} from {} to {}", sequence, val, val * 2);
*data = val * 2;
});
})
.with_barrier(|b| {
b.handle_events(|data, sequence, _| {
let val = *data;
if val as i64 != sequence * 2 {
panic!(
"concurrency problem detected (p2), expected {}, but got {}",
sequence * 2,
val
);
}
});
})
.build();

let handle = executor.spawn();
let producer = std::sync::Arc::new(producer);
let producer1 = producer.clone();
let producer2 = producer.clone();

let p1 = std::thread::spawn(move || {
for i in 1..=MAX / 10 {
let range = ((i - 1) * 20)..=((i - 1) * 20 + 19);
let items: Vec<_> = range.collect();
producer1.write(items, |d, seq, _| {
*d = seq as u32;
});
}
});
let p2 = std::thread::spawn(move || {
for i in 1..=MAX / 10 {
let range = ((i - 1) * 20)..=((i - 1) * 20 + 19);
let items: Vec<_> = range.collect();
producer2.write(items, |d, seq, _| {
*d = seq as u32;
});
}
});

p1.join().unwrap();
p2.join().unwrap();
if let Ok(producer) = std::sync::Arc::try_unwrap(producer) {
producer.drain();
}

handle.join();
}

fn main() {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"{}[{}][{:?}][{}] {}",
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
record.target(),
std::thread::current().id(),
record.level(),
message
))
})
.level(log::LevelFilter::Debug)
.chain(std::io::stdout())
.chain(fern::log_file("output.log").unwrap())
.apply()
.unwrap();

info!("running blocking wait strategy");
follow_sequence::<BlockingWaitStrategy>();

info!("running spinning wait strategy");
follow_sequence::<SpinLoopWaitStrategy>();
}
5 changes: 5 additions & 0 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ impl<W: WaitStrategy, D: DataProvider<T>, T> WithWaitStrategy<W, D, T> {
let buffer_size = self.with_data_provider.data_provider.buffer_size();
self.with_sequencer(SingleProducerSequencer::new(buffer_size, W::new()))
}

pub fn with_multi_producer(self) -> WithSequencer<MultiProducerSequencer<W>, W, D, T> {
let buffer_size = self.with_data_provider.data_provider.buffer_size();
self.with_sequencer(MultiProducerSequencer::new(buffer_size, W::new()))
}
}

impl<'a, S: Sequencer + 'a, W: WaitStrategy, D: DataProvider<T> + 'a, T: Send + 'a>
Expand Down
8 changes: 7 additions & 1 deletion src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ impl AtomicSequence {
pub fn set(&self, value: Sequence) {
self.offset.store(value, Ordering::Release);
}

pub fn compare_exchange(&self, current: Sequence, new: Sequence) -> bool {
self.offset
.compare_exchange(current, new, Ordering::SeqCst, Ordering::Acquire)
.is_ok()
}
}

impl Default for AtomicSequence {
Expand All @@ -45,7 +51,7 @@ pub trait Sequencer {
type Barrier: SequenceBarrier;

fn next(&self, count: usize) -> (Sequence, Sequence);
fn publish(&self, highest: Sequence);
fn publish(&self, lo: Sequence, hi: Sequence);
fn create_barrier(&mut self, gating_sequences: &[Arc<AtomicSequence>]) -> Self::Barrier;
fn add_gating_sequence(&mut self, gating_sequence: &Arc<AtomicSequence>);
fn get_cursor(&self) -> Arc<AtomicSequence>;
Expand Down
Loading

0 comments on commit 22fefa5

Please sign in to comment.