Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics collect stress test #2247

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ version = "0.1.0"
edition = "2021"
publish = false

[[bin]]
name = "metrics_collect"
path = "src/metrics_collect.rs"
doc = false

[[bin]] # Bin to run the metrics stress tests for Counter
name = "metrics"
path = "src/metrics_counter.rs"
Expand Down Expand Up @@ -39,6 +44,7 @@ name = "random"
path = "src/random.rs"
doc = false


[dependencies]
ctrlc = "3.2.5"
lazy_static = "1.4.0"
Expand All @@ -51,6 +57,7 @@ tracing = { workspace = true, features = ["std"]}
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
num-format = "0.4.4"
sysinfo = { version = "0.30.12", optional = true }
clap = { version = "4.5.20", features = ["derive"] }

[features]
stats = ["sysinfo"]
stats = ["sysinfo"]
239 changes: 239 additions & 0 deletions stress/src/metrics_collect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
use std::{
cell::RefCell,
ops::DerefMut,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Barrier, Weak,
},
time::{Duration, Instant},
};

use lazy_static::lazy_static;
use opentelemetry::{
metrics::{Histogram, MeterProvider, MetricResult},
KeyValue,
};
use opentelemetry_sdk::{
metrics::{
data::{ResourceMetrics, Temporality},
reader::MetricReader,
InstrumentKind, ManualReader, Pipeline, SdkMeterProvider,
},
Resource,
};

use rand::{
rngs::{self, SmallRng},
Rng, SeedableRng,
};

use clap::{Parser, ValueEnum};

#[derive(Debug, Clone, Copy, ValueEnum)]
enum CliTemporality {
Cumulative,
Delta,
}

/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(
version,
about = "Measure metrics performance while collecting",
long_about = "The purpose of this test is to see how collecing interferre with measurements.\n\
Most of the test measure how fast is collecting phase, but more important is\n\
that it doesn't \"stop-the-world\" while collection phase is running."
)]
struct Cli {
/// Select collection phase temporality
temporality: CliTemporality,
}

lazy_static! {
pub static ref ATTRIBUTE_VALUES: [&'static str; 10] = [
"value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9",
"value10"
];
}

thread_local! {

/// Store random number generator for each thread
pub static CURRENT_RNG: RefCell<rngs::SmallRng> = RefCell::new(rngs::SmallRng::from_entropy());
}

fn main() {
let cli = Cli::parse();
let temporality = match cli.temporality {
CliTemporality::Cumulative => Temporality::Cumulative,
CliTemporality::Delta => Temporality::Delta,
};
let reader = SharedReader::new(
ManualReader::builder()
.with_temporality(temporality)
.build(),
);
let provider = SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
// use histogram, as it is a bit more complicated during
let histogram = provider.meter("test").u64_histogram("hello").build();

calculate_measurements_during_collection(histogram, reader).print_results();
}

fn calculate_measurements_during_collection(
histogram: Histogram<u64>,
reader: SharedReader,
) -> MeasurementResults {
// we don't need to use every single CPU, better leave other CPU for operating system work,
// so our running threads could be much more stable in performance.
// just for the record, this is has HUGE effect on my machine (laptop intel i7-1355u)
let num_threads = num_cpus::get() / 2;

let mut res = MeasurementResults {
total_measurements_count: 0,
total_time_collecting: 0,
num_iterations: 0,
};
let start = Instant::now();
while start.elapsed() < Duration::from_secs(3) {
res.num_iterations += 1;
let is_collecting = AtomicBool::new(false);
let measurements_while_collecting = AtomicUsize::new(0);
let time_while_collecting = AtomicUsize::new(0);
let barrier = Barrier::new(num_threads + 1);
std::thread::scope(|s| {
// first create bunch of measurements,
// so that collection phase wouldn't be "empty"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done without spawning new threads. (before you spawn threads for running collect and recording measurements)

let mut handles = Vec::new();
for _t in 0..num_threads {
handles.push(s.spawn(|| {
for _i in 0..1000 {
CURRENT_RNG.with(|rng| {
histogram.record(
1,
&random_attribute_set3(
rng.borrow_mut().deref_mut(),
ATTRIBUTE_VALUES.as_ref(),
),
);
});
}
}));
}
for handle in handles {
handle.join().unwrap();
}

// simultaneously start collecting and creating more measurements
for _ in 0..num_threads - 1 {
s.spawn(|| {
barrier.wait();
let now = Instant::now();
let mut count = 0;
while is_collecting.load(Ordering::Acquire) {
Copy link
Contributor

@utpilla utpilla Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could have a much simpler and effective setup here. If we want to know whether running collect stops the world, it's better to spawn a thread that keeps calling reader.collect on a loop. And have other threads, record measurements simultaneously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that initially. The problem was that collection phase was not realistic as it had 0 measurements and basically held lock in a loop. So I decided to make it more realistic, by generating some measurements so collection wouldn't be empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was that collection phase was not realistic as it had 0 measurements and basically held lock in a loop

I don't follow. You could always record some measurements before you call collect. My concern is around the way this test is setup. The test is using some custom "iterations" which seems unnecessary. You could simply do this instead:

  1. Emit some measurements.
  2. Spawn a thread that runs collect in a loop.
  3. Spawn additional threads that record measurements.
  4. Calculate throughput for the measurements recorded in step 3.

If doing the above, leads to zero measurements being recorded, then we have our answer: Collect is going to "stop the world" as long as it runs.

As I mentioned in this comment, unless we plan to use a more efficient synchronization mechanism in ValueMap, this stress test would not be adding any value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, this test is adding value, as it is able to measure difference with different "collect" implementation.
Second, running collect in the loop doesn't work with delta temporality, because after first run there will be zero measurements and all other iterations will basically hold write lock in the loop. While in reality no one runs collect in loop and usually there are many measurements and its important that write lock is held as short as possible (which is not the case with current implementation, hence I was able to improve it and this test actually measures it).
I could probably have different tests for delta an cumulative temporality, but I decided to emulate realistic environment.

Copy link
Contributor

@utpilla utpilla Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While in reality no one runs collect in loop

In reality, no one runs the collect the way it's run in this stress test either. Collect would usually be run periodically in 10/30/60 seconds intervals.

First of all, this test is adding value, as it is able to measure difference with different "collect" implementation.

usually there are many measurements and its important that write lock is held as short as possible (which is not the case with current implementation, hence I was able to improve it and this test actually measures it).

This test claims to measure update throughput "while collect is running". The actual implementation of this test however relies on squeezing in some updates before collect runs. That's not the same as testing "while collect is running".

I would love to improve collect to take the write lock for the shortest possible time, but it's better tested using a benchmark.

Copy link
Contributor Author

@fraillt fraillt Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual implementation of this test however relies on squeezing in some updates before collect runs.

There's a lot of truth to this statement, but none the less it appears to measure something useful :)

temporality type of change before change (measurements/ms) after change (measurements/ms)
Cumulative changed write to read lock of hashmap 9 840
Delta reduce the amount of time write lock is held 17 56

Ideally I would like to "catch" changes to how attribute-set hashmap are locked with both: existing types of measurements (with existing attribute-set and new attribute-set combination).
Maybe we need to tests these things separately... I don't know, any ideas are welcome :)

CURRENT_RNG.with(|rng| {
histogram.record(
1,
&random_attribute_set3(
rng.borrow_mut().deref_mut(),
ATTRIBUTE_VALUES.as_ref(),
),
);
});
count += 1;
}
measurements_while_collecting.fetch_add(count, Ordering::AcqRel);
time_while_collecting
.fetch_add(now.elapsed().as_micros() as usize, Ordering::AcqRel);
});
}

let collect_handle = s.spawn(|| {
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
is_collecting.store(true, Ordering::Release);
barrier.wait();
reader.collect(&mut rm).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PR description, you mentioned that you saw a major improvement when running this with the changes for #2145 locally. I'm curious, the code changes for #2145 are mostly related to reuse of collect code across instruments. How is that improving the perf numbers?

Throughput perf here should mostly depend on the thread synchronization mechanism used for collect and update. Currently, it's using a RwLock in ValueMap and the collect operation requires a write lock which would "stop the world" until collect has the write lock. Unless you change that, I wouldn't expect any significant improvement in throughput here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For delta temporality I was able to reduce the amount of time a write lock is held while collecting.
For cumulative temporality I simply changed to read lock (currently there is write lock, although cumulative temporality doesn't modify anything, and IMO locking individual measurements is sufficient and much better, then "stop the world until all measurements are read" approach). Although read lock is better, but measurements with new attribute-sets will still be locked. I see two more alternatives:

  • clone entire attribute-set hashmap (acquire read lock, clone hashmap, release lock, iterate individual measurements)
  • implement some sort of sharding (more throughtput at the cost of single measurement performance)

In any case, there are ways to improve it:) so I created this PR so we could actually measure different ideas.

Copy link
Contributor

@utpilla utpilla Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are good ideas!

For cumulative temporality I simply changed to read lock (currently there is write lock, although cumulative temporality doesn't modify anything,

Great! In that case, we should be able test update throughput while collect runs in a loop for Cumulative and a use a simple benchmark for Delta. 🙂

Copy link
Member

@lalitb lalitb Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implement some sort of sharding (more throughtput at the cost of single measurement performance)

I did try sharding approach earlier in #1564 - to reduce the write-lock duration by locking only the specific shard of measurements during collection. However, I encountered some concurrency issues. It’s still an approach worth revisiting at some point.

Great! In that case, we should be able test update throughput while collect runs in a loop for Cumulative and a use a simple benchmark for Delta. 🙂

+1

is_collecting.store(false, Ordering::Release);
});
barrier.wait();
collect_handle.join().unwrap();
});
res.total_measurements_count += measurements_while_collecting.load(Ordering::Acquire);
res.total_time_collecting += time_while_collecting.load(Ordering::Acquire);
}
res
}

struct MeasurementResults {
total_measurements_count: usize,
total_time_collecting: usize,
num_iterations: usize,
}

impl MeasurementResults {
fn print_results(&self) {
println!(
"{:>10.2} measurements/ms",
self.total_measurements_count as f32 / (self.total_time_collecting as f32 / 1000.0f32)
);
println!(
"{:>10.2} measurements/it",
self.total_measurements_count as f32 / self.num_iterations as f32,
);
println!(
"{:>10.2} μs/it",
self.total_time_collecting as f32 / self.num_iterations as f32,
);
}
}

fn random_attribute_set3(rng: &mut SmallRng, values: &[&'static str]) -> [KeyValue; 3] {
let len = values.len();
unsafe {
[
KeyValue::new("attribute1", *values.get_unchecked(rng.gen_range(0..len))),
KeyValue::new("attribute2", *values.get_unchecked(rng.gen_range(0..len))),
KeyValue::new("attribute3", *values.get_unchecked(rng.gen_range(0..len))),
]
}
}

// copy/paste from opentelemetry-sdk/benches/metric.rs
#[derive(Clone, Debug)]
pub struct SharedReader(Arc<dyn MetricReader>);

impl SharedReader {
pub fn new<R>(reader: R) -> Self
where
R: MetricReader,
{
Self(Arc::new(reader))
}
}

impl MetricReader for SharedReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.0.register_pipeline(pipeline)
}

fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
self.0.collect(rm)
}

fn force_flush(&self) -> MetricResult<()> {
self.0.force_flush()
}

fn shutdown(&self) -> MetricResult<()> {
self.0.shutdown()
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.0.temporality(kind)
}
}
Loading