Skip to content

Commit

Permalink
Unify Histogram and ExpHistogram aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt authored and Mindaugas Vinkelis committed Sep 13, 2024
1 parent ff9d50b commit 706adea
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 474 deletions.
46 changes: 30 additions & 16 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{Arc, Weak};

use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use opentelemetry::{
global,
metrics::{Counter, Histogram, MeterProvider as _, Result},
Key, KeyValue,
};
Expand Down Expand Up @@ -344,7 +345,7 @@ fn counters(c: &mut Criterion) {
const MAX_BOUND: usize = 100000;

fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram<u64>) {
let mut bounds = vec![0; bound_count];
let mut bounds: Vec<usize> = vec![0; bound_count];
#[allow(clippy::needless_range_loop)]
for i in 0..bounds.len() {
bounds[i] = i * MAX_BOUND / bound_count
Expand Down Expand Up @@ -394,22 +395,35 @@ fn histograms(c: &mut Criterion) {
);
}
}
group.bench_function("CollectOne", |b| benchmark_collect_histogram(b, 1));
group.bench_function("CollectFive", |b| benchmark_collect_histogram(b, 5));
group.bench_function("CollectTen", |b| benchmark_collect_histogram(b, 10));
group.bench_function("CollectTwentyFive", |b| benchmark_collect_histogram(b, 25));
for metrics_size in [1, 5, 20] {
for attr_sets in [1, 5, 20, 200] {
group.bench_function(
format!("Collect{metrics_size}Metric{attr_sets}AttrSets"),
|b| benchmark_collect_histogram(b, metrics_size, attr_sets),
);
}
}
}

fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
fn benchmark_collect_histogram(b: &mut Bencher, metrics_size: usize, attr_sets: usize) {
let r = SharedReader(Arc::new(ManualReader::default()));
let mtr = SdkMeterProvider::builder()
.with_reader(r.clone())
.build()
.meter("sdk/metric/bench/histogram");

for i in 0..n {
let h = mtr.u64_histogram(format!("fake_data_{i}")).init();
h.record(1, &[]);
let provider = SdkMeterProvider::builder().with_reader(r.clone()).build();
let mtr = provider.meter("sdk/metric/bench/histogram");
global::set_meter_provider(provider);

let mut rng = rand::thread_rng();
for m in 0..metrics_size {
let h = mtr.u64_histogram(format!("fake_data_{m}")).init();
for _att in 0..attr_sets {
let mut attributes: Vec<KeyValue> = Vec::new();
for _i in 0..rng.gen_range(0..3) {
attributes.push(KeyValue::new(
format!("K{}", rng.gen_range::<i32, _>(0..10)),
format!("V{}", rng.gen_range::<i32, _>(0..10)),
))
}
h.record(1, &attributes)
}
}

let mut rm = ResourceMetrics {
Expand All @@ -418,8 +432,8 @@ fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
};

b.iter(|| {
let _ = r.collect(&mut rm);
assert_eq!(rm.scope_metrics[0].metrics.len(), n);
r.collect(&mut rm).unwrap();
assert_eq!(rm.scope_metrics[0].metrics.len(), metrics_size);
})
}

Expand Down
183 changes: 183 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use std::{
collections::HashMap,
fmt::Debug,
ops::Deref,
sync::{Arc, Mutex, RwLock},
};

use opentelemetry::{global, metrics::MetricsError, KeyValue};

use crate::metrics::AttributeSet;

use super::{
aggregate::is_under_cardinality_limit, Number, STREAM_OVERFLOW_ATTRIBUTES,
STREAM_OVERFLOW_ATTRIBUTES_ERR,
};

/// Aggregator interface
pub(crate) trait Aggregator<T>: Debug
where
T: Number,
{
/// A static configuration that is needed by configurators.
/// E.g. bucket_size at creation time and buckets list at aggregator update.
type Config;

/// Called everytime a new attribute-set is stored.
fn create(init: &Self::Config) -> Self;

/// Called for each measurement.
fn update(&mut self, config: &Self::Config, measurement: T);
}

/// hashing and sorting is expensive, so we have two lists
/// sorted list is mainly needed for fast collection phase
struct WithAttribsAggregators<A> {
// put all attribute combinations in this list
all: HashMap<Vec<KeyValue>, Arc<Mutex<A>>>,
sorted: HashMap<Vec<KeyValue>, Arc<Mutex<A>>>,
}

/// This class is responsible for two things:
/// * send measurement information for specific aggregator (per attribute-set)
/// * collect all attribute-sets + aggregators (either readonly OR reset)
///
/// Even though it's simple to understand it's responsibility,
/// implementation is a lot more complex to make it very performant.
pub(crate) struct AttributeSetAggregation<T, A>
where
T: Number,
A: Aggregator<T>,
{
/// Aggregator for values with no attributes attached.
no_attribs: Mutex<Option<A>>,
list: RwLock<WithAttribsAggregators<A>>,
/// Configuration required to create and update the [`Aggregator`]
config: A::Config,
}

impl<T, A> AttributeSetAggregation<T, A>
where
T: Number,
A: Aggregator<T>,
{
/// Initiate aggregators by specifing [`Aggregator`] configuration.
pub(crate) fn new(init_data: A::Config) -> Self {
Self {
no_attribs: Mutex::new(None),
list: RwLock::new(WithAttribsAggregators {
all: Default::default(),
sorted: Default::default(),
}),
config: init_data,
}
}

/// Update specific aggregator depending on provided attributes.
pub(crate) fn measure(&self, attrs: &[KeyValue], measurement: T) {
if attrs.is_empty() {
if let Ok(mut aggr) = self.no_attribs.lock() {
aggr.get_or_insert_with(|| A::create(&self.config))
.update(&self.config, measurement);
}
return;
}
let Ok(list) = self.list.read() else {
return;

Check warning on line 86 in opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs#L86

Added line #L86 was not covered by tests
};
if let Some(aggr) = list.all.get(attrs) {
if let Ok(mut aggr) = aggr.lock() {
aggr.update(&self.config, measurement);
}
return;
}
drop(list);
let Ok(mut list) = self.list.write() else {
return;

Check warning on line 96 in opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs#L96

Added line #L96 was not covered by tests
};

// Recheck again in case another thread already inserted
if let Some(aggr) = list.all.get(attrs) {
if let Ok(mut aggr) = aggr.lock() {
aggr.update(&self.config, measurement);
}

Check warning on line 103 in opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs#L101-L103

Added lines #L101 - L103 were not covered by tests
} else if is_under_cardinality_limit(list.all.len()) {
let mut aggr = A::create(&self.config);
aggr.update(&self.config, measurement);
let aggr = Arc::new(Mutex::new(aggr));
list.all.insert(attrs.into(), aggr.clone());
let sorted_attribs = AttributeSet::from(attrs).into_vec();
list.sorted.insert(sorted_attribs, aggr);
} else if let Some(aggr) = list.sorted.get(STREAM_OVERFLOW_ATTRIBUTES.deref()) {
if let Ok(mut aggr) = aggr.lock() {
aggr.update(&self.config, measurement);
}
} else {
let mut aggr = A::create(&self.config);
aggr.update(&self.config, measurement);
list.sorted.insert(
STREAM_OVERFLOW_ATTRIBUTES.clone(),
Arc::new(Mutex::new(aggr)),
);
global::handle_error(MetricsError::Other(STREAM_OVERFLOW_ATTRIBUTES_ERR.into()));
}

Check warning on line 123 in opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs#L112-L123

Added lines #L112 - L123 were not covered by tests
}

/// Iterate through all attribute sets and populate `DataPoints`in readonly mode.
pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
{
let Ok(list) = self.list.read() else {
return;

Check warning on line 132 in opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs#L132

Added line #L132 was not covered by tests
};
prepare_data(dest, list.sorted.len());
if let Ok(aggr) = self.no_attribs.lock() {
if let Some(aggr) = aggr.deref() {
dest.push(map_fn(Default::default(), aggr));
}
};

Check warning on line 139 in opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs#L139

Added line #L139 was not covered by tests
dest.extend(
list.sorted
.iter()
.filter_map(|(k, v)| v.lock().ok().map(|v| map_fn(k.clone(), &v))),
)
}

/// Iterate through all attribute sets and populate `DataPoints`, while also consuming (reseting) aggregators
pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
let Ok(mut list) = self.list.write() else {
return;

Check warning on line 153 in opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs#L153

Added line #L153 was not covered by tests
};
prepare_data(dest, list.sorted.len());
if let Ok(mut aggr) = self.no_attribs.lock() {
if let Some(aggr) = aggr.take() {
dest.push(map_fn(Default::default(), aggr));
}
};

Check warning on line 160 in opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/attribute_set_aggregation.rs#L160

Added line #L160 was not covered by tests
list.all.clear();
dest.extend(list.sorted.drain().filter_map(|(k, v)| {
Arc::try_unwrap(v)
.expect("this is last instance, so we cannot fail to get it")
.into_inner()
.ok()
.map(|v| map_fn(k, v))
}));
}

pub(crate) fn config(&self) -> &A::Config {
&self.config
}
}

/// Clear and allocate exactly required amount of space for all attribute-sets
fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
data.clear();
let total_len = list_len + 1; // to account for no_attributes case
if total_len > data.capacity() {
data.reserve_exact(total_len - data.capacity());
}
}
Loading

0 comments on commit 706adea

Please sign in to comment.