Skip to content

Commit

Permalink
ValueMap interface change
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt authored and Mindaugas Vinkelis committed Sep 29, 2024
1 parent 161929d commit bf24bca
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl<T: Number> ExpoHistogram<T> {
pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
let f_value = value.into_float();
// Ignore NaN and infinity.
// Only makes sense if T is f64, maybe this could be no-op for other cases?
if f_value.is_infinite() || f_value.is_nan() {
return;
}
Expand Down
202 changes: 123 additions & 79 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,9 @@ use crate::metrics::data::HistogramDataPoint;
use crate::metrics::data::{self, Aggregation, Temporality};
use opentelemetry::KeyValue;

use super::Number;
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
use super::ValueMap;
use super::{Aggregator, Number};

struct HistogramUpdate;

impl Operation for HistogramUpdate {
fn update_tracker<T: Default, AT: AtomicTracker<T>>(tracker: &AT, value: T, index: usize) {
tracker.update_histogram(index, value);
}
}

struct HistogramTracker<T> {
buckets: Mutex<Buckets<T>>,
}

impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
fn update_histogram(&self, index: usize, value: T) {
let mut buckets = match self.buckets.lock() {
Ok(guard) => guard,
Err(_) => return,
};

buckets.bin(index, value);
buckets.sum(value);
}
}

impl<T: Number> AtomicallyUpdate<T> for HistogramTracker<T> {
type AtomicTracker = HistogramTracker<T>;

fn new_atomic_tracker(buckets_count: Option<usize>) -> Self::AtomicTracker {
let count = buckets_count.unwrap();
HistogramTracker {
buckets: Mutex::new(Buckets::<T>::new(count)),
}
}
}

#[derive(Default)]
struct Buckets<T> {
counts: Vec<u64>,
count: u64,
Expand All @@ -54,29 +18,17 @@ struct Buckets<T> {
max: T,
}

impl<T: Number> Buckets<T> {
/// returns buckets with `n` bins.
fn new(n: usize) -> Buckets<T> {
Buckets {
counts: vec![0; n],
impl<T> Buckets<T>
where
T: Number,
{
fn new(size: usize) -> Self {
Self {
counts: vec![0; size],
count: 0,
total: T::default(),
min: T::max(),
max: T::min(),
..Default::default()
}
}

fn sum(&mut self, value: T) {
self.total += value;
}

fn bin(&mut self, idx: usize, value: T) {
self.counts[idx] += 1;
self.count += 1;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value
}
}

Expand All @@ -91,45 +43,72 @@ impl<T: Number> Buckets<T> {
}
}

impl<T> Aggregator<T> for Mutex<Buckets<T>>
where
T: Number,
{
type InitConfig = usize;
/// Value and bucket index
type PreComputedValue = (T, usize);

fn create(size: &usize) -> Self {
Mutex::new(Buckets::new(*size))
}

fn update(&self, (value, idx): (T, usize)) {
if let Ok(mut this) = self.lock() {
this.counts[idx] += 1;
this.count += 1;
if value < this.min {
this.min = value;
}
if value > this.max {
this.max = value
}
this.total += value;
}
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
/// buckets.
pub(crate) struct Histogram<T: Number> {
value_map: ValueMap<HistogramTracker<T>, T, HistogramUpdate>,
value_map: ValueMap<T, Mutex<Buckets<T>>>,
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
start: Mutex<SystemTime>,
}

impl<T: Number> Histogram<T> {
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
let buckets_count = boundaries.len() + 1;
let mut histogram = Histogram {
value_map: ValueMap::new_with_buckets_count(buckets_count),
bounds: boundaries,
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
bounds.retain(|v| !v.is_nan());
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
let buckets_count = bounds.len() + 1;
Self {
value_map: ValueMap::new(buckets_count),
bounds,
record_min_max,
record_sum,
start: Mutex::new(SystemTime::now()),
};

histogram.bounds.retain(|v| !v.is_nan());
histogram
.bounds
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));

histogram
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f = measurement.into_float();

// Ignore NaN and infinity.
// Only makes sense if T is f64, maybe this could be no-op for other cases?
if f.is_infinite() || f.is_nan() {
return;
}
// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let index = self.bounds.partition_point(|&x| x < f);
self.value_map.measure(measurement, attrs, index);

self.value_map.measure((measurement, index), attrs);
}

pub(crate) fn delta(
Expand Down Expand Up @@ -167,7 +146,7 @@ impl<T: Number> Histogram<T> {
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
Expand Down Expand Up @@ -205,7 +184,7 @@ impl<T: Number> Histogram<T> {
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
if let Ok(b) = tracker.buckets.lock() {
if let Ok(b) = tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
Expand Down Expand Up @@ -278,7 +257,7 @@ impl<T: Number> Histogram<T> {
.has_no_attribute_value
.load(Ordering::Acquire)
{
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
if let Ok(b) = &self.value_map.no_attribute_tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
Expand Down Expand Up @@ -318,7 +297,7 @@ impl<T: Number> Histogram<T> {
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
if let Ok(b) = tracker.buckets.lock() {
if let Ok(b) = tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
Expand Down Expand Up @@ -350,3 +329,68 @@ impl<T: Number> Histogram<T> {
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}
}

#[cfg(test)]
mod tests {

use super::*;

#[test]
fn when_f64_is_nan_or_infinity_then_ignore() {
struct Expected {
min: f64,
max: f64,
sum: f64,
count: u64,
}
impl Expected {
fn new(min: f64, max: f64, sum: f64, count: u64) -> Self {
Expected {
min,
max,
sum,
count,
}
}
}
struct TestCase {
values: Vec<f64>,
expected: Expected,
}

let test_cases = vec![
TestCase {
values: vec![2.0, 4.0, 1.0],
expected: Expected::new(1.0, 4.0, 7.0, 3),
},
TestCase {
values: vec![2.0, 4.0, 1.0, f64::INFINITY],
expected: Expected::new(1.0, 4.0, 7.0, 3),
},
TestCase {
values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
expected: Expected::new(1.0, 4.0, 7.0, 3),
},
TestCase {
values: vec![2.0, f64::NAN, 4.0, 1.0],
expected: Expected::new(1.0, 4.0, 7.0, 3),
},
TestCase {
values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
expected: Expected::new(1.0, 16.0, 31.0, 6),
},
];

for test in test_cases {
let h = Histogram::new(vec![], true, true);
for v in test.values {
h.measure(v, &[]);
}
let res = h.value_map.no_attribute_tracker.lock().unwrap();
assert_eq!(test.expected.max, res.max);
assert_eq!(test.expected.min, res.min);
assert_eq!(test.expected.sum, res.total);
assert_eq!(test.expected.count, res.count);
}
}
}
46 changes: 38 additions & 8 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,51 @@ use std::{
use crate::metrics::data::DataPoint;
use opentelemetry::KeyValue;

use super::{Assign, AtomicTracker, Number, ValueMap};
use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};

/// this is reused by PrecomputedSum
pub(crate) struct Assign<T>
where
T: AtomicallyUpdate<T>,
{
pub(crate) value: T::AtomicTracker,
}

impl<T> Aggregator<T> for Assign<T>
where
T: Number,
{
type InitConfig = ();
type PreComputedValue = T;

fn create(_init: &()) -> Self {
Self {
value: T::new_atomic_tracker(T::default()),
}
}

fn update(&self, value: T) {
self.value.store(value)
}
}

/// Summarizes a set of measurements as the last one made.
pub(crate) struct LastValue<T: Number> {
value_map: ValueMap<T, T, Assign>,
value_map: ValueMap<T, Assign<T>>,
start: Mutex<SystemTime>,
}

impl<T: Number> LastValue<T> {
pub(crate) fn new() -> Self {
LastValue {
value_map: ValueMap::new(),
value_map: ValueMap::new(()),
start: Mutex::new(SystemTime::now()),
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
// The argument index is not applicable to LastValue.
self.value_map.measure(measurement, attrs, 0);
self.value_map.measure(measurement, attrs);
}

pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
Expand All @@ -49,7 +75,11 @@ impl<T: Number> LastValue<T> {
attributes: vec![],
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_tracker.get_and_reset_value(),
value: self
.value_map
.no_attribute_tracker
.value
.get_and_reset_value(),
exemplars: vec![],
});
}
Expand All @@ -66,7 +96,7 @@ impl<T: Number> LastValue<T> {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
value: tracker.value.get_value(),
exemplars: vec![],
});
}
Expand Down Expand Up @@ -101,7 +131,7 @@ impl<T: Number> LastValue<T> {
attributes: vec![],
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_tracker.get_value(),
value: self.value_map.no_attribute_tracker.value.get_value(),
exemplars: vec![],
});
}
Expand All @@ -118,7 +148,7 @@ impl<T: Number> LastValue<T> {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
value: tracker.value.get_value(),
exemplars: vec![],
});
}
Expand Down
Loading

0 comments on commit bf24bca

Please sign in to comment.