diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index c23b441663..a877e22354 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -343,6 +343,7 @@ impl ExpoHistogram { pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) { let f_value = value.into_float(); // Ignore NaN and infinity. + // Only makes sense if T is f64, maybe this could be no-op for other cases? if f_value.is_infinite() || f_value.is_nan() { return; } diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 089415ba7c..7e873995ef 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -7,45 +7,9 @@ use crate::metrics::data::HistogramDataPoint; use crate::metrics::data::{self, Aggregation, Temporality}; use opentelemetry::KeyValue; -use super::Number; -use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap}; +use super::ValueMap; +use super::{Aggregator, Number}; -struct HistogramUpdate; - -impl Operation for HistogramUpdate { - fn update_tracker>(tracker: &AT, value: T, index: usize) { - tracker.update_histogram(index, value); - } -} - -struct HistogramTracker { - buckets: Mutex>, -} - -impl AtomicTracker for HistogramTracker { - fn update_histogram(&self, index: usize, value: T) { - let mut buckets = match self.buckets.lock() { - Ok(guard) => guard, - Err(_) => return, - }; - - buckets.bin(index, value); - buckets.sum(value); - } -} - -impl AtomicallyUpdate for HistogramTracker { - type AtomicTracker = HistogramTracker; - - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker { - let count = buckets_count.unwrap(); - HistogramTracker { - buckets: Mutex::new(Buckets::::new(count)), - } - } -} - -#[derive(Default)] struct Buckets { counts: Vec, count: u64, @@ -54,29 +18,17 @@ struct Buckets { max: T, } -impl Buckets { - /// returns buckets with `n` bins. - fn new(n: usize) -> Buckets { - Buckets { - counts: vec![0; n], +impl Buckets +where + T: Number, +{ + fn new(size: usize) -> Self { + Self { + counts: vec![0; size], + count: 0, + total: T::default(), min: T::max(), max: T::min(), - ..Default::default() - } - } - - fn sum(&mut self, value: T) { - self.total += value; - } - - fn bin(&mut self, idx: usize, value: T) { - self.counts[idx] += 1; - self.count += 1; - if value < self.min { - self.min = value; - } - if value > self.max { - self.max = value } } @@ -91,10 +43,37 @@ impl Buckets { } } +impl Aggregator for Mutex> +where + T: Number, +{ + type InitConfig = usize; + /// Value and bucket index + type PreComputedValue = (T, usize); + + fn create(size: &usize) -> Self { + Mutex::new(Buckets::new(*size)) + } + + fn update(&self, (value, idx): (T, usize)) { + if let Ok(mut this) = self.lock() { + this.counts[idx] += 1; + this.count += 1; + if value < this.min { + this.min = value; + } + if value > this.max { + this.max = value + } + this.total += value; + } + } +} + /// Summarizes a set of measurements as a histogram with explicitly defined /// buckets. pub(crate) struct Histogram { - value_map: ValueMap, T, HistogramUpdate>, + value_map: ValueMap>>, bounds: Vec, record_min_max: bool, record_sum: bool, @@ -102,34 +81,34 @@ pub(crate) struct Histogram { } impl Histogram { - pub(crate) fn new(boundaries: Vec, record_min_max: bool, record_sum: bool) -> Self { - let buckets_count = boundaries.len() + 1; - let mut histogram = Histogram { - value_map: ValueMap::new_with_buckets_count(buckets_count), - bounds: boundaries, + pub(crate) fn new(mut bounds: Vec, record_min_max: bool, record_sum: bool) -> Self { + bounds.retain(|v| !v.is_nan()); + bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out")); + let buckets_count = bounds.len() + 1; + Self { + value_map: ValueMap::new(buckets_count), + bounds, record_min_max, record_sum, start: Mutex::new(SystemTime::now()), - }; - - histogram.bounds.retain(|v| !v.is_nan()); - histogram - .bounds - .sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out")); - - histogram + } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { let f = measurement.into_float(); - + // Ignore NaN and infinity. + // Only makes sense if T is f64, maybe this could be no-op for other cases? + if f.is_infinite() || f.is_nan() { + return; + } // This search will return an index in the range `[0, bounds.len()]`, where // it will return `bounds.len()` if value is greater than the last element // of `bounds`. This aligns with the buckets in that the length of buckets // is `bounds.len()+1`, with the last bucket representing: // `(bounds[bounds.len()-1], +∞)`. let index = self.bounds.partition_point(|&x| x < f); - self.value_map.measure(measurement, attrs, index); + + self.value_map.measure((measurement, index), attrs); } pub(crate) fn delta( @@ -167,7 +146,7 @@ impl Histogram { .has_no_attribute_value .swap(false, Ordering::AcqRel) { - if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() { + if let Ok(ref mut b) = self.value_map.no_attribute_tracker.lock() { h.data_points.push(HistogramDataPoint { attributes: vec![], start_time: start, @@ -205,7 +184,7 @@ impl Histogram { let mut seen = HashSet::new(); for (attrs, tracker) in trackers.drain() { if seen.insert(Arc::as_ptr(&tracker)) { - if let Ok(b) = tracker.buckets.lock() { + if let Ok(b) = tracker.lock() { h.data_points.push(HistogramDataPoint { attributes: attrs.clone(), start_time: start, @@ -278,7 +257,7 @@ impl Histogram { .has_no_attribute_value .load(Ordering::Acquire) { - if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() { + if let Ok(b) = &self.value_map.no_attribute_tracker.lock() { h.data_points.push(HistogramDataPoint { attributes: vec![], start_time: start, @@ -318,7 +297,7 @@ impl Histogram { let mut seen = HashSet::new(); for (attrs, tracker) in trackers.iter() { if seen.insert(Arc::as_ptr(tracker)) { - if let Ok(b) = tracker.buckets.lock() { + if let Ok(b) = tracker.lock() { h.data_points.push(HistogramDataPoint { attributes: attrs.clone(), start_time: start, @@ -350,3 +329,68 @@ impl Histogram { (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } } + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn when_f64_is_nan_or_infinity_then_ignore() { + struct Expected { + min: f64, + max: f64, + sum: f64, + count: u64, + } + impl Expected { + fn new(min: f64, max: f64, sum: f64, count: u64) -> Self { + Expected { + min, + max, + sum, + count, + } + } + } + struct TestCase { + values: Vec, + expected: Expected, + } + + let test_cases = vec![ + TestCase { + values: vec![2.0, 4.0, 1.0], + expected: Expected::new(1.0, 4.0, 7.0, 3), + }, + TestCase { + values: vec![2.0, 4.0, 1.0, f64::INFINITY], + expected: Expected::new(1.0, 4.0, 7.0, 3), + }, + TestCase { + values: vec![2.0, 4.0, 1.0, -f64::INFINITY], + expected: Expected::new(1.0, 4.0, 7.0, 3), + }, + TestCase { + values: vec![2.0, f64::NAN, 4.0, 1.0], + expected: Expected::new(1.0, 4.0, 7.0, 3), + }, + TestCase { + values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0], + expected: Expected::new(1.0, 16.0, 31.0, 6), + }, + ]; + + for test in test_cases { + let h = Histogram::new(vec![], true, true); + for v in test.values { + h.measure(v, &[]); + } + let res = h.value_map.no_attribute_tracker.lock().unwrap(); + assert_eq!(test.expected.max, res.max); + assert_eq!(test.expected.min, res.min); + assert_eq!(test.expected.sum, res.total); + assert_eq!(test.expected.count, res.count); + } + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index d1eab4fada..e4c9433f9a 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -7,25 +7,51 @@ use std::{ use crate::metrics::data::DataPoint; use opentelemetry::KeyValue; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap}; + +/// this is reused by PrecomputedSum +pub(crate) struct Assign +where + T: AtomicallyUpdate, +{ + pub(crate) value: T::AtomicTracker, +} + +impl Aggregator for Assign +where + T: Number, +{ + type InitConfig = (); + type PreComputedValue = T; + + fn create(_init: &()) -> Self { + Self { + value: T::new_atomic_tracker(T::default()), + } + } + + fn update(&self, value: T) { + self.value.store(value) + } +} /// Summarizes a set of measurements as the last one made. pub(crate) struct LastValue { - value_map: ValueMap, + value_map: ValueMap>, start: Mutex, } impl LastValue { pub(crate) fn new() -> Self { LastValue { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), start: Mutex::new(SystemTime::now()), } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to LastValue. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec>) { @@ -49,7 +75,11 @@ impl LastValue { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: self + .value_map + .no_attribute_tracker + .value + .get_and_reset_value(), exemplars: vec![], }); } @@ -66,7 +96,7 @@ impl LastValue { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } @@ -101,7 +131,7 @@ impl LastValue { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -118,7 +148,7 @@ impl LastValue { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index abc691b2fc..409e6ad84b 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -7,7 +7,6 @@ mod sum; use core::fmt; use std::collections::HashMap; -use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -24,79 +23,65 @@ use crate::metrics::AttributeSet; pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); -/// Abstracts the update operation for a measurement. -pub(crate) trait Operation { - fn update_tracker>(tracker: &AT, value: T, index: usize); -} - -struct Increment; +pub(crate) trait Aggregator +where + T: Number, +{ + /// A static configuration that is needed in order to initialize aggregator. + /// E.g. bucket_size at creation time . + type InitConfig; -impl Operation for Increment { - fn update_tracker>(tracker: &AT, value: T, _: usize) { - tracker.add(value); - } -} + /// Some aggregators can do some computations before updating aggregator. + /// This helps to reduce contention for aggregators because it makes + /// [`Aggregator::update`] as short as possible. + type PreComputedValue; -struct Assign; + /// Called everytime a new attribute-set is stored. + fn create(init: &Self::InitConfig) -> Self; -impl Operation for Assign { - fn update_tracker>(tracker: &AT, value: T, _: usize) { - tracker.store(value); - } + /// Called for each measurement. + fn update(&self, value: Self::PreComputedValue); } /// The storage for sums. /// /// This structure is parametrized by an `Operation` that indicates how /// updates to the underlying value trackers should be performed. -pub(crate) struct ValueMap, T: Number, O> { +pub(crate) struct ValueMap +where + T: Number, + A: Aggregator, +{ /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, + trackers: RwLock, Arc>>, /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. has_no_attribute_value: AtomicBool, /// Tracker for values with no attributes attached. - no_attribute_tracker: AU::AtomicTracker, - /// Buckets Count is only used by Histogram. - buckets_count: Option, - phantom: PhantomData, + no_attribute_tracker: A, + /// Configuration for an Aggregator + config: A::InitConfig, } -impl, T: Number, O> Default for ValueMap { - fn default() -> Self { - ValueMap::new() - } -} - -impl, T: Number, O> ValueMap { - fn new() -> Self { - ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(None), - count: AtomicUsize::new(0), - buckets_count: None, - phantom: PhantomData, - } - } - - fn new_with_buckets_count(buckets_count: usize) -> Self { +impl ValueMap +where + T: Number, + A: Aggregator, +{ + fn new(config: A::InitConfig) -> Self { ValueMap { trackers: RwLock::new(HashMap::new()), has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(Some(buckets_count)), + no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), - buckets_count: Some(buckets_count), - phantom: PhantomData, + config, } } -} -impl, T: Number, O: Operation> ValueMap { - fn measure(&self, measurement: T, attributes: &[KeyValue], index: usize) { + fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) { if attributes.is_empty() { - O::update_tracker(&self.no_attribute_tracker, measurement, index); + self.no_attribute_tracker.update(value); self.has_no_attribute_value.store(true, Ordering::Release); return; } @@ -107,14 +92,14 @@ impl, T: Number, O: Operation> ValueMap { // Try to retrieve and update the tracker with the attributes in the provided order first if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); return; } // Try to retrieve and update the tracker with the attributes sorted. let sorted_attrs = AttributeSet::from(attributes).into_vec(); if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); return; } @@ -128,12 +113,12 @@ impl, T: Number, O: Operation> ValueMap { // Recheck both the provided and sorted orders after acquiring the write lock // in case another thread has pushed an update in the meantime. if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(AU::new_atomic_tracker(self.buckets_count)); - O::update_tracker(&*new_tracker, measurement, index); + let new_tracker = Arc::new(A::create(&self.config)); + new_tracker.update(value); // Insert tracker with the attributes in the provided and sorted orders trackers.insert(attributes.to_vec(), new_tracker.clone()); @@ -141,10 +126,10 @@ impl, T: Number, O: Operation> ValueMap { self.count.fetch_add(1, Ordering::SeqCst); } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - O::update_tracker(&**overflow_value, measurement, index); + overflow_value.update(value); } else { - let new_tracker = AU::new_atomic_tracker(self.buckets_count); - O::update_tracker(&new_tracker, measurement, index); + let new_tracker = A::create(&self.config); + new_tracker.update(value); trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); } @@ -153,22 +138,17 @@ impl, T: Number, O: Operation> ValueMap { /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms -pub(crate) trait AtomicTracker: Sync + Send + 'static { - fn store(&self, _value: T) {} - fn add(&self, _value: T) {} - fn get_value(&self) -> T { - T::default() - } - fn get_and_reset_value(&self) -> T { - T::default() - } - fn update_histogram(&self, _index: usize, _value: T) {} +pub(crate) trait AtomicTracker: Sync + Send + 'static { + fn store(&self, _value: T); + fn add(&self, _value: T); + fn get_value(&self) -> T; + fn get_and_reset_value(&self) -> T; } /// Marks a type that can have an atomic tracker generated for it -pub(crate) trait AtomicallyUpdate { +pub(crate) trait AtomicallyUpdate { type AtomicTracker: AtomicTracker; - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker; + fn new_atomic_tracker(init: T) -> Self::AtomicTracker; } pub(crate) trait Number: @@ -255,8 +235,8 @@ impl AtomicTracker for AtomicU64 { impl AtomicallyUpdate for u64 { type AtomicTracker = AtomicU64; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - AtomicU64::new(0) + fn new_atomic_tracker(init: u64) -> Self::AtomicTracker { + AtomicU64::new(init) } } @@ -281,8 +261,8 @@ impl AtomicTracker for AtomicI64 { impl AtomicallyUpdate for i64 { type AtomicTracker = AtomicI64; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - AtomicI64::new(0) + fn new_atomic_tracker(init: i64) -> Self::AtomicTracker { + AtomicI64::new(init) } } @@ -291,10 +271,10 @@ pub(crate) struct F64AtomicTracker { } impl F64AtomicTracker { - fn new() -> Self { - let zero_as_u64 = 0.0_f64.to_bits(); + fn new(init: f64) -> Self { + let value_as_u64 = init.to_bits(); F64AtomicTracker { - inner: AtomicU64::new(zero_as_u64), + inner: AtomicU64::new(value_as_u64), } } } @@ -343,8 +323,8 @@ impl AtomicTracker for F64AtomicTracker { impl AtomicallyUpdate for f64 { type AtomicTracker = F64AtomicTracker; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - F64AtomicTracker::new() + fn new_atomic_tracker(init: f64) -> Self::AtomicTracker { + F64AtomicTracker::new(init) } } @@ -354,7 +334,7 @@ mod tests { #[test] fn can_store_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -367,7 +347,7 @@ mod tests { #[test] fn can_add_and_get_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); atomic.add(15); atomic.add(10); @@ -377,7 +357,7 @@ mod tests { #[test] fn can_reset_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); atomic.add(15); let value = atomic.get_and_reset_value(); @@ -389,7 +369,7 @@ mod tests { #[test] fn can_store_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -406,7 +386,7 @@ mod tests { #[test] fn can_add_and_get_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); atomic.add(15); atomic.add(-10); @@ -416,7 +396,7 @@ mod tests { #[test] fn can_reset_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); atomic.add(15); let value = atomic.get_and_reset_value(); @@ -428,7 +408,7 @@ mod tests { #[test] fn can_store_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -445,7 +425,7 @@ mod tests { #[test] fn can_add_and_get_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); atomic.add(15.3); atomic.add(10.4); @@ -456,7 +436,7 @@ mod tests { #[test] fn can_reset_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); atomic.add(15.5); let value = atomic.get_and_reset_value(); diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 060c7baaa6..f08f70b73e 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -2,7 +2,7 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; use std::{ collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc, Mutex}, @@ -11,7 +11,7 @@ use std::{ /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { - value_map: ValueMap, + value_map: ValueMap>, monotonic: bool, start: Mutex, reported: Mutex, T>>, @@ -20,7 +20,7 @@ pub(crate) struct PrecomputedSum { impl PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -29,7 +29,7 @@ impl PrecomputedSum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to PrecomputedSum. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -73,7 +73,7 @@ impl PrecomputedSum { .has_no_attribute_value .swap(false, Ordering::AcqRel) { - let value = self.value_map.no_attribute_tracker.get_value(); + let value = self.value_map.no_attribute_tracker.value.get_value(); let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); new_reported.insert(vec![], value); @@ -94,7 +94,7 @@ impl PrecomputedSum { let mut seen = HashSet::new(); for (attrs, tracker) in trackers.drain() { if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.get_value(); + let value = tracker.value.get_value(); let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); new_reported.insert(attrs.clone(), value); s_data.data_points.push(DataPoint { @@ -162,7 +162,7 @@ impl PrecomputedSum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -179,7 +179,7 @@ impl PrecomputedSum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 66af75734d..17d81ca262 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -7,12 +7,37 @@ use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use opentelemetry::KeyValue; -use super::{AtomicTracker, Number}; -use super::{Increment, ValueMap}; +use super::{Aggregator, AtomicTracker, Number}; +use super::{AtomicallyUpdate, ValueMap}; + +struct Increment +where + T: AtomicallyUpdate, +{ + value: T::AtomicTracker, +} + +impl Aggregator for Increment +where + T: Number, +{ + type InitConfig = (); + type PreComputedValue = T; + + fn create(_init: &()) -> Self { + Self { + value: T::new_atomic_tracker(T::default()), + } + } + + fn update(&self, value: T) { + self.value.add(value) + } +} /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { - value_map: ValueMap, + value_map: ValueMap>, monotonic: bool, start: Mutex, } @@ -25,7 +50,7 @@ impl Sum { /// were made in. pub(crate) fn new(monotonic: bool) -> Self { Sum { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), monotonic, start: Mutex::new(SystemTime::now()), } @@ -33,7 +58,7 @@ impl Sum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to Sum. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -76,7 +101,11 @@ impl Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: self + .value_map + .no_attribute_tracker + .value + .get_and_reset_value(), exemplars: vec![], }); } @@ -93,7 +122,7 @@ impl Sum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } @@ -152,7 +181,7 @@ impl Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -173,7 +202,7 @@ impl Sum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); }