Skip to content

Commit

Permalink
ValueMap interface change
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt authored and Mindaugas Vinkelis committed Sep 18, 2024
1 parent 3976f3d commit 9f87703
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ impl<T: Number> ExpoHistogram<T> {
}

pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
let f_value = value.into_float();
// Ignore NaN and infinity.
if f_value.is_infinite() || f_value.is_nan() {
return;
}

let attrs: AttributeSet = attrs.into();
if let Ok(mut values) = self.values.lock() {
let v = values.entry(attrs).or_insert_with(|| {
Expand Down
194 changes: 89 additions & 105 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,41 @@ use crate::metrics::data::HistogramDataPoint;
use crate::metrics::data::{self, Aggregation, Temporality};
use opentelemetry::KeyValue;

use super::Number;
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
use super::ValueMap;
use super::{Aggregator, Number};

struct HistogramUpdate;

impl Operation for HistogramUpdate {
fn update_tracker<T: Default, AT: AtomicTracker<T>>(tracker: &AT, value: T, index: usize) {
tracker.update_histogram(index, value);
}
}

struct HistogramTracker<T> {
buckets: Mutex<Buckets<T>>,
}

impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
fn update_histogram(&self, index: usize, value: T) {
let mut buckets = match self.buckets.lock() {
Ok(guard) => guard,
Err(_) => return,
};

buckets.bin(index, value);
buckets.sum(value);
}
}

impl<T: Number> AtomicallyUpdate<T> for HistogramTracker<T> {
type AtomicTracker = HistogramTracker<T>;

fn new_atomic_tracker(buckets_count: Option<usize>) -> Self::AtomicTracker {
let count = buckets_count.unwrap();
HistogramTracker {
buckets: Mutex::new(Buckets::<T>::new(count)),
}
}
struct BucketsConfig {
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
}

#[derive(Default)]
struct Buckets<T> {
#[derive(Default, Debug, Clone)]
struct BucketsData<T> {
counts: Vec<u64>,
count: u64,
total: T,
min: T,
max: T,
}

impl<T: Number> Buckets<T> {
/// returns buckets with `n` bins.
fn new(n: usize) -> Buckets<T> {
Buckets {
counts: vec![0; n],
struct Buckets<T> {
data: Mutex<BucketsData<T>>,
}

impl<T> BucketsData<T>
where
T: Number,
{
fn new(size: usize) -> Self {
Self {
counts: vec![0; size],
min: T::max(),
max: T::min(),
..Default::default()
}
}

fn sum(&mut self, value: T) {
self.total += value;
}

fn bin(&mut self, idx: usize, value: T) {
self.counts[idx] += 1;
self.count += 1;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value
}
}

fn reset(&mut self) {
for item in &mut self.counts {
*item = 0;
Expand All @@ -91,45 +53,67 @@ impl<T: Number> Buckets<T> {
}
}

impl<T> Aggregator<T> for Buckets<T>
where
T: Number,
{
type Config = BucketsConfig;

fn create(config: &BucketsConfig) -> Self {
let size = config.bounds.len() + 1;
Buckets {
data: Mutex::new(BucketsData::new(size)),
}
}

fn update(&self, config: &BucketsConfig, measurement: T) {
let f_value = measurement.into_float();
// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let idx = config.bounds.partition_point(|&x| x < f_value);
if let Ok(mut data) = self.data.lock() {
data.counts[idx] += 1;
data.count += 1;
if config.record_min_max {
if measurement < data.min {
data.min = measurement;
}
if measurement > data.max {
data.max = measurement
}
}
// it's very cheap to update it, even if it is not configured to record_sum
data.total += measurement;
}
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
/// buckets.
pub(crate) struct Histogram<T: Number> {
value_map: ValueMap<HistogramTracker<T>, T, HistogramUpdate>,
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
value_map: ValueMap<T, Buckets<T>>,
start: Mutex<SystemTime>,
}

impl<T: Number> Histogram<T> {
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
let buckets_count = boundaries.len() + 1;
let mut histogram = Histogram {
value_map: ValueMap::new_with_buckets_count(buckets_count),
bounds: boundaries,
record_min_max,
record_sum,
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
bounds.retain(|v| !v.is_nan());
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
Self {
value_map: ValueMap::new(BucketsConfig {
record_min_max,
record_sum,
bounds,
}),
start: Mutex::new(SystemTime::now()),
};

histogram.bounds.retain(|v| !v.is_nan());
histogram
.bounds
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));

histogram
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f = measurement.into_float();

// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let index = self.bounds.partition_point(|&x| x < f);
self.value_map.measure(measurement, attrs, index);
self.value_map.measure(measurement, attrs);
}

pub(crate) fn delta(
Expand Down Expand Up @@ -167,25 +151,25 @@ impl<T: Number> Histogram<T> {
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.data.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand All @@ -205,25 +189,25 @@ impl<T: Number> Histogram<T> {
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
if let Ok(b) = tracker.buckets.lock() {
if let Ok(b) = tracker.data.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand Down Expand Up @@ -278,25 +262,25 @@ impl<T: Number> Histogram<T> {
.has_no_attribute_value
.load(Ordering::Acquire)
{
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
if let Ok(b) = &self.value_map.no_attribute_tracker.data.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand All @@ -318,25 +302,25 @@ impl<T: Number> Histogram<T> {
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
if let Ok(b) = tracker.buckets.lock() {
if let Ok(b) = tracker.data.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand Down
Loading

0 comments on commit 9f87703

Please sign in to comment.