-
Notifications
You must be signed in to change notification settings - Fork 440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics collect stress test #2247
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,239 @@ | ||||||||||||||
use std::{ | ||||||||||||||
cell::RefCell, | ||||||||||||||
ops::DerefMut, | ||||||||||||||
sync::{ | ||||||||||||||
atomic::{AtomicBool, AtomicUsize, Ordering}, | ||||||||||||||
Arc, Barrier, Weak, | ||||||||||||||
}, | ||||||||||||||
time::{Duration, Instant}, | ||||||||||||||
}; | ||||||||||||||
|
||||||||||||||
use lazy_static::lazy_static; | ||||||||||||||
use opentelemetry::{ | ||||||||||||||
metrics::{Histogram, MeterProvider, MetricResult}, | ||||||||||||||
KeyValue, | ||||||||||||||
}; | ||||||||||||||
use opentelemetry_sdk::{ | ||||||||||||||
metrics::{ | ||||||||||||||
data::{ResourceMetrics, Temporality}, | ||||||||||||||
reader::MetricReader, | ||||||||||||||
InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, | ||||||||||||||
}, | ||||||||||||||
Resource, | ||||||||||||||
}; | ||||||||||||||
|
||||||||||||||
use rand::{ | ||||||||||||||
rngs::{self, SmallRng}, | ||||||||||||||
Rng, SeedableRng, | ||||||||||||||
}; | ||||||||||||||
|
||||||||||||||
use clap::{Parser, ValueEnum}; | ||||||||||||||
|
||||||||||||||
#[derive(Debug, Clone, Copy, ValueEnum)] | ||||||||||||||
enum CliTemporality { | ||||||||||||||
Cumulative, | ||||||||||||||
Delta, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/// Simple program to greet a person | ||||||||||||||
#[derive(Parser, Debug)] | ||||||||||||||
#[command( | ||||||||||||||
version, | ||||||||||||||
about = "Measure metrics performance while collecting", | ||||||||||||||
long_about = "The purpose of this test is to see how collecing interferre with measurements.\n\ | ||||||||||||||
Most of the test measure how fast is collecting phase, but more important is\n\ | ||||||||||||||
that it doesn't \"stop-the-world\" while collection phase is running." | ||||||||||||||
)] | ||||||||||||||
struct Cli { | ||||||||||||||
/// Select collection phase temporality | ||||||||||||||
temporality: CliTemporality, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
lazy_static! { | ||||||||||||||
pub static ref ATTRIBUTE_VALUES: [&'static str; 10] = [ | ||||||||||||||
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", | ||||||||||||||
"value10" | ||||||||||||||
]; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
thread_local! { | ||||||||||||||
|
||||||||||||||
/// Store random number generator for each thread | ||||||||||||||
pub static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy()); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn main() { | ||||||||||||||
let cli = Cli::parse(); | ||||||||||||||
let temporality = match cli.temporality { | ||||||||||||||
CliTemporality::Cumulative => Temporality::Cumulative, | ||||||||||||||
CliTemporality::Delta => Temporality::Delta, | ||||||||||||||
}; | ||||||||||||||
let reader = SharedReader::new( | ||||||||||||||
ManualReader::builder() | ||||||||||||||
.with_temporality(temporality) | ||||||||||||||
.build(), | ||||||||||||||
); | ||||||||||||||
let provider = SdkMeterProvider::builder() | ||||||||||||||
.with_reader(reader.clone()) | ||||||||||||||
.build(); | ||||||||||||||
// use histogram, as it is a bit more complicated during | ||||||||||||||
let histogram = provider.meter("test").u64_histogram("hello").build(); | ||||||||||||||
|
||||||||||||||
calculate_measurements_during_collection(histogram, reader).print_results(); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn calculate_measurements_during_collection( | ||||||||||||||
histogram: Histogram<u64>, | ||||||||||||||
reader: SharedReader, | ||||||||||||||
) -> MeasurementResults { | ||||||||||||||
// we don't need to use every single CPU, better leave other CPU for operating system work, | ||||||||||||||
// so our running threads could be much more stable in performance. | ||||||||||||||
// just for the record, this is has HUGE effect on my machine (laptop intel i7-1355u) | ||||||||||||||
let num_threads = num_cpus::get() / 2; | ||||||||||||||
|
||||||||||||||
let mut res = MeasurementResults { | ||||||||||||||
total_measurements_count: 0, | ||||||||||||||
total_time_collecting: 0, | ||||||||||||||
num_iterations: 0, | ||||||||||||||
}; | ||||||||||||||
let start = Instant::now(); | ||||||||||||||
while start.elapsed() < Duration::from_secs(3) { | ||||||||||||||
res.num_iterations += 1; | ||||||||||||||
let is_collecting = AtomicBool::new(false); | ||||||||||||||
let measurements_while_collecting = AtomicUsize::new(0); | ||||||||||||||
let time_while_collecting = AtomicUsize::new(0); | ||||||||||||||
let barrier = Barrier::new(num_threads + 1); | ||||||||||||||
std::thread::scope(|s| { | ||||||||||||||
// first create bunch of measurements, | ||||||||||||||
// so that collection phase wouldn't be "empty" | ||||||||||||||
let mut handles = Vec::new(); | ||||||||||||||
for _t in 0..num_threads { | ||||||||||||||
handles.push(s.spawn(|| { | ||||||||||||||
for _i in 0..1000 { | ||||||||||||||
CURRENT_RNG.with(|rng| { | ||||||||||||||
histogram.record( | ||||||||||||||
1, | ||||||||||||||
&random_attribute_set3( | ||||||||||||||
rng.borrow_mut().deref_mut(), | ||||||||||||||
ATTRIBUTE_VALUES.as_ref(), | ||||||||||||||
), | ||||||||||||||
); | ||||||||||||||
}); | ||||||||||||||
} | ||||||||||||||
})); | ||||||||||||||
} | ||||||||||||||
for handle in handles { | ||||||||||||||
handle.join().unwrap(); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// simultaneously start collecting and creating more measurements | ||||||||||||||
for _ in 0..num_threads - 1 { | ||||||||||||||
s.spawn(|| { | ||||||||||||||
barrier.wait(); | ||||||||||||||
let now = Instant::now(); | ||||||||||||||
let mut count = 0; | ||||||||||||||
while is_collecting.load(Ordering::Acquire) { | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could have a much simpler and effective setup here. If we want to know whether running collect stops the world, it's better to spawn a thread that keeps calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did that initially. The problem was that collection phase was not realistic as it had 0 measurements and basically held lock in a loop. So I decided to make it more realistic, by generating some measurements so collection wouldn't be empty. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't follow. You could always record some measurements before you call collect. My concern is around the way this test is setup. The test is using some custom "iterations" which seems unnecessary. You could simply do this instead:
If doing the above, leads to zero measurements being recorded, then we have our answer: As I mentioned in this comment, unless we plan to use a more efficient synchronization mechanism in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First of all, this test is adding value, as it is able to measure difference with different "collect" implementation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In reality, no one runs the collect the way it's run in this stress test either. Collect would usually be run periodically in 10/30/60 seconds intervals.
This test claims to measure update throughput "while collect is running". The actual implementation of this test however relies on squeezing in some updates before collect runs. That's not the same as testing "while collect is running". I would love to improve collect to take the write lock for the shortest possible time, but it's better tested using a benchmark. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There's a lot of truth to this statement, but none the less it appears to measure something useful :)
Ideally I would like to "catch" changes to how attribute-set hashmap are locked with both: existing types of measurements (with existing attribute-set and new attribute-set combination). |
||||||||||||||
CURRENT_RNG.with(|rng| { | ||||||||||||||
histogram.record( | ||||||||||||||
1, | ||||||||||||||
&random_attribute_set3( | ||||||||||||||
rng.borrow_mut().deref_mut(), | ||||||||||||||
ATTRIBUTE_VALUES.as_ref(), | ||||||||||||||
), | ||||||||||||||
); | ||||||||||||||
}); | ||||||||||||||
count += 1; | ||||||||||||||
} | ||||||||||||||
measurements_while_collecting.fetch_add(count, Ordering::AcqRel); | ||||||||||||||
time_while_collecting | ||||||||||||||
.fetch_add(now.elapsed().as_micros() as usize, Ordering::AcqRel); | ||||||||||||||
}); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
let collect_handle = s.spawn(|| { | ||||||||||||||
let mut rm = ResourceMetrics { | ||||||||||||||
resource: Resource::empty(), | ||||||||||||||
scope_metrics: Vec::new(), | ||||||||||||||
}; | ||||||||||||||
is_collecting.store(true, Ordering::Release); | ||||||||||||||
barrier.wait(); | ||||||||||||||
reader.collect(&mut rm).unwrap(); | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the PR description, you mentioned that you saw a major improvement when running this with the changes for #2145 locally. I'm curious, the code changes for #2145 are mostly related to reuse of collect code across instruments. How is that improving the perf numbers? Throughput perf here should mostly depend on the thread synchronization mechanism used for collect and update. Currently, it's using a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For delta temporality I was able to reduce the amount of time a write lock is held while collecting.
In any case, there are ways to improve it:) so I created this PR so we could actually measure different ideas. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are good ideas!
Great! In that case, we should be able test update throughput while There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I did try sharding approach earlier in #1564 - to reduce the write-lock duration by locking only the specific shard of measurements during collection. However, I encountered some concurrency issues. It’s still an approach worth revisiting at some point.
+1 |
||||||||||||||
is_collecting.store(false, Ordering::Release); | ||||||||||||||
}); | ||||||||||||||
barrier.wait(); | ||||||||||||||
collect_handle.join().unwrap(); | ||||||||||||||
}); | ||||||||||||||
res.total_measurements_count += measurements_while_collecting.load(Ordering::Acquire); | ||||||||||||||
res.total_time_collecting += time_while_collecting.load(Ordering::Acquire); | ||||||||||||||
} | ||||||||||||||
res | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
struct MeasurementResults { | ||||||||||||||
total_measurements_count: usize, | ||||||||||||||
total_time_collecting: usize, | ||||||||||||||
num_iterations: usize, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
impl MeasurementResults { | ||||||||||||||
fn print_results(&self) { | ||||||||||||||
println!( | ||||||||||||||
"{:>10.2} measurements/ms", | ||||||||||||||
self.total_measurements_count as f32 / (self.total_time_collecting as f32 / 1000.0f32) | ||||||||||||||
); | ||||||||||||||
println!( | ||||||||||||||
"{:>10.2} measurements/it", | ||||||||||||||
self.total_measurements_count as f32 / self.num_iterations as f32, | ||||||||||||||
); | ||||||||||||||
println!( | ||||||||||||||
"{:>10.2} μs/it", | ||||||||||||||
self.total_time_collecting as f32 / self.num_iterations as f32, | ||||||||||||||
); | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn random_attribute_set3(rng: &mut SmallRng, values: &[&'static str]) -> [KeyValue; 3] { | ||||||||||||||
let len = values.len(); | ||||||||||||||
unsafe { | ||||||||||||||
[ | ||||||||||||||
KeyValue::new("attribute1", *values.get_unchecked(rng.gen_range(0..len))), | ||||||||||||||
KeyValue::new("attribute2", *values.get_unchecked(rng.gen_range(0..len))), | ||||||||||||||
KeyValue::new("attribute3", *values.get_unchecked(rng.gen_range(0..len))), | ||||||||||||||
] | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// copy/paste from opentelemetry-sdk/benches/metric.rs | ||||||||||||||
#[derive(Clone, Debug)] | ||||||||||||||
pub struct SharedReader(Arc<dyn MetricReader>); | ||||||||||||||
|
||||||||||||||
impl SharedReader { | ||||||||||||||
pub fn new<R>(reader: R) -> Self | ||||||||||||||
where | ||||||||||||||
R: MetricReader, | ||||||||||||||
{ | ||||||||||||||
Self(Arc::new(reader)) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
impl MetricReader for SharedReader { | ||||||||||||||
fn register_pipeline(&self, pipeline: Weak<Pipeline>) { | ||||||||||||||
self.0.register_pipeline(pipeline) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> { | ||||||||||||||
self.0.collect(rm) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn force_flush(&self) -> MetricResult<()> { | ||||||||||||||
self.0.force_flush() | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn shutdown(&self) -> MetricResult<()> { | ||||||||||||||
self.0.shutdown() | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
fn temporality(&self, kind: InstrumentKind) -> Temporality { | ||||||||||||||
self.0.temporality(kind) | ||||||||||||||
} | ||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done without spawning new threads. (before you spawn threads for running collect and recording measurements)