Skip to content

Commit

Permalink
ValueMap interface change
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt authored and Mindaugas Vinkelis committed Sep 14, 2024
1 parent 7ab5e0f commit 825d181
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 214 deletions.
185 changes: 85 additions & 100 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,16 @@ use crate::metrics::data::HistogramDataPoint;
use crate::metrics::data::{self, Aggregation, Temporality};
use opentelemetry::KeyValue;

use super::Number;
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
use super::ValueMap;
use super::{Aggregator, Number};

struct HistogramUpdate;

impl Operation for HistogramUpdate {
fn update_tracker<T: Default, AT: AtomicTracker<T>>(tracker: &AT, value: T, index: usize) {
tracker.update_histogram(index, value);
}
}

struct HistogramTracker<T> {
buckets: Mutex<Buckets<T>>,
}

impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
fn update_histogram(&self, index: usize, value: T) {
let mut buckets = match self.buckets.lock() {
Ok(guard) => guard,
Err(_) => return,
};

buckets.bin(index, value);
buckets.sum(value);
}
}

impl<T: Number> AtomicallyUpdate<T> for HistogramTracker<T> {
type AtomicTracker = HistogramTracker<T>;

fn new_atomic_tracker(buckets_count: Option<usize>) -> Self::AtomicTracker {
let count = buckets_count.unwrap();
HistogramTracker {
buckets: Mutex::new(Buckets::<T>::new(count)),
}
}
struct BucketsConfig {
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
}

#[derive(Default)]
#[derive(Default, Debug, Clone)]
struct Buckets<T> {
counts: Vec<u64>,
count: u64,
Expand All @@ -54,30 +25,37 @@ struct Buckets<T> {
max: T,
}

impl<T: Number> Buckets<T> {
/// returns buckets with `n` bins.
fn new(n: usize) -> Buckets<T> {
Buckets {
counts: vec![0; n],
impl<T> Buckets<T>
where
T: Number,
{
fn new(size: usize) -> Self {
Self {
counts: vec![0; size],
min: T::max(),
max: T::min(),
..Default::default()
}
}

fn sum(&mut self, value: T) {
self.total += value;
}

fn bin(&mut self, idx: usize, value: T) {
fn update(&mut self, config: &BucketsConfig, f_value: f64, measurement: T) {
// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let idx = config.bounds.partition_point(|&x| x < f_value);
self.counts[idx] += 1;
self.count += 1;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value
if config.record_min_max {
if measurement < self.min {
self.min = measurement;
}
if measurement > self.max {
self.max = measurement
}
}
// it's very cheap to update it, even if it is not configured to record_sum
self.total += measurement;
}

fn reset(&mut self) {
Expand All @@ -91,45 +69,52 @@ impl<T: Number> Buckets<T> {
}
}

impl<T> Aggregator<T> for Mutex<Buckets<T>>
where
T: Number,
{
type Config = BucketsConfig;

fn create(config: &BucketsConfig) -> Self {
let size = config.bounds.len() + 1;
Mutex::new(Buckets::new(size))
}

fn update(&self, config: &BucketsConfig, measurement: T) {
let f_value = measurement.into_float();
// Ignore NaN and infinity.
if f_value.is_infinite() || f_value.is_nan() {
return;

Check warning on line 87 in opentelemetry-sdk/src/metrics/internal/histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/histogram.rs#L87

Added line #L87 was not covered by tests
}
if let Ok(mut this) = self.lock() {
this.update(config, f_value, measurement);
}
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
/// buckets.
pub(crate) struct Histogram<T: Number> {
value_map: ValueMap<HistogramTracker<T>, T, HistogramUpdate>,
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
value_map: ValueMap<T, Mutex<Buckets<T>>>,
start: Mutex<SystemTime>,
}

impl<T: Number> Histogram<T> {
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
let buckets_count = boundaries.len() + 1;
let mut histogram = Histogram {
value_map: ValueMap::new_with_buckets_count(buckets_count),
bounds: boundaries,
record_min_max,
record_sum,
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
bounds.retain(|v| !v.is_nan());
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
Self {
value_map: ValueMap::new(BucketsConfig {
record_min_max,
record_sum,
bounds,
}),
start: Mutex::new(SystemTime::now()),
};

histogram.bounds.retain(|v| !v.is_nan());
histogram
.bounds
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));

histogram
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f = measurement.into_float();

// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let index = self.bounds.partition_point(|&x| x < f);
self.value_map.measure(measurement, attrs, index);
self.value_map.measure(measurement, attrs);
}

pub(crate) fn delta(
Expand Down Expand Up @@ -167,25 +152,25 @@ impl<T: Number> Histogram<T> {
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand All @@ -205,25 +190,25 @@ impl<T: Number> Histogram<T> {
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
if let Ok(b) = tracker.buckets.lock() {
if let Ok(b) = tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand Down Expand Up @@ -278,25 +263,25 @@ impl<T: Number> Histogram<T> {
.has_no_attribute_value
.load(Ordering::Acquire)
{
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
if let Ok(b) = &self.value_map.no_attribute_tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand All @@ -318,25 +303,25 @@ impl<T: Number> Histogram<T> {
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
if let Ok(b) = tracker.buckets.lock() {
if let Ok(b) = tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand Down
Loading

0 comments on commit 825d181

Please sign in to comment.