Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update CHANGELOG #2124

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## vNext

- **BREAKING** Public API changes:
- **Removed**: `SdkMeter` struct [#2113](https://github.com/open-telemetry/opentelemetry-rust/pull/2113)
- **Removed**: `AggregationSelector` trait and `DefaultAggregationSelector` struct [#2085](https://github.com/open-telemetry/opentelemetry-rust/pull/2085)

- Update `async-std` dependency version to 1.13
- *Breaking* - Remove support for `MetricProducer` which allowed metrics from
external sources to be sent through OpenTelemetry.
Expand Down
185 changes: 85 additions & 100 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,16 @@ use crate::metrics::data::HistogramDataPoint;
use crate::metrics::data::{self, Aggregation, Temporality};
use opentelemetry::KeyValue;

use super::Number;
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
use super::ValueMap;
use super::{Aggregator, Number};

struct HistogramUpdate;

impl Operation for HistogramUpdate {
fn update_tracker<T: Default, AT: AtomicTracker<T>>(tracker: &AT, value: T, index: usize) {
tracker.update_histogram(index, value);
}
}

struct HistogramTracker<T> {
buckets: Mutex<Buckets<T>>,
}

impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
fn update_histogram(&self, index: usize, value: T) {
let mut buckets = match self.buckets.lock() {
Ok(guard) => guard,
Err(_) => return,
};

buckets.bin(index, value);
buckets.sum(value);
}
}

impl<T: Number> AtomicallyUpdate<T> for HistogramTracker<T> {
type AtomicTracker = HistogramTracker<T>;

fn new_atomic_tracker(buckets_count: Option<usize>) -> Self::AtomicTracker {
let count = buckets_count.unwrap();
HistogramTracker {
buckets: Mutex::new(Buckets::<T>::new(count)),
}
}
struct BucketsConfig {
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
}

#[derive(Default)]
#[derive(Default, Debug, Clone)]
struct Buckets<T> {
counts: Vec<u64>,
count: u64,
Expand All @@ -54,30 +25,37 @@ struct Buckets<T> {
max: T,
}

impl<T: Number> Buckets<T> {
/// returns buckets with `n` bins.
fn new(n: usize) -> Buckets<T> {
Buckets {
counts: vec![0; n],
impl<T> Buckets<T>
where
T: Number,
{
fn new(size: usize) -> Self {
Self {
counts: vec![0; size],
min: T::max(),
max: T::min(),
..Default::default()
}
}

fn sum(&mut self, value: T) {
self.total += value;
}

fn bin(&mut self, idx: usize, value: T) {
fn update(&mut self, config: &BucketsConfig, f_value: f64, measurement: T) {
// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let idx = config.bounds.partition_point(|&x| x < f_value);
self.counts[idx] += 1;
self.count += 1;
if value < self.min {
self.min = value;
}
if value > self.max {
self.max = value
if config.record_min_max {
if measurement < self.min {
self.min = measurement;
}
if measurement > self.max {
self.max = measurement
}
}
// it's very cheap to update it, even if it is not configured to record_sum
self.total += measurement;
}

fn reset(&mut self) {
Expand All @@ -91,45 +69,52 @@ impl<T: Number> Buckets<T> {
}
}

impl<T> Aggregator<T> for Mutex<Buckets<T>>
where
T: Number,
{
type Config = BucketsConfig;

fn create(config: &BucketsConfig) -> Self {
let size = config.bounds.len() + 1;
Mutex::new(Buckets::new(size))
}

fn update(&self, config: &BucketsConfig, measurement: T) {
let f_value = measurement.into_float();
// Ignore NaN and infinity.
if f_value.is_infinite() || f_value.is_nan() {
return;
}
if let Ok(mut this) = self.lock() {
this.update(config, f_value, measurement);
}
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
/// buckets.
pub(crate) struct Histogram<T: Number> {
value_map: ValueMap<HistogramTracker<T>, T, HistogramUpdate>,
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
value_map: ValueMap<T, Mutex<Buckets<T>>>,
start: Mutex<SystemTime>,
}

impl<T: Number> Histogram<T> {
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
let buckets_count = boundaries.len() + 1;
let mut histogram = Histogram {
value_map: ValueMap::new_with_buckets_count(buckets_count),
bounds: boundaries,
record_min_max,
record_sum,
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
bounds.retain(|v| !v.is_nan());
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
Self {
value_map: ValueMap::new(BucketsConfig {
record_min_max,
record_sum,
bounds,
}),
start: Mutex::new(SystemTime::now()),
};

histogram.bounds.retain(|v| !v.is_nan());
histogram
.bounds
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));

histogram
}
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let f = measurement.into_float();

// This search will return an index in the range `[0, bounds.len()]`, where
// it will return `bounds.len()` if value is greater than the last element
// of `bounds`. This aligns with the buckets in that the length of buckets
// is `bounds.len()+1`, with the last bucket representing:
// `(bounds[bounds.len()-1], +∞)`.
let index = self.bounds.partition_point(|&x| x < f);
self.value_map.measure(measurement, attrs, index);
self.value_map.measure(measurement, attrs);
}

pub(crate) fn delta(
Expand Down Expand Up @@ -167,25 +152,25 @@ impl<T: Number> Histogram<T> {
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand All @@ -205,25 +190,25 @@ impl<T: Number> Histogram<T> {
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
if let Ok(b) = tracker.buckets.lock() {
if let Ok(b) = tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand Down Expand Up @@ -278,25 +263,25 @@ impl<T: Number> Histogram<T> {
.has_no_attribute_value
.load(Ordering::Acquire)
{
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
if let Ok(b) = &self.value_map.no_attribute_tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand All @@ -318,25 +303,25 @@ impl<T: Number> Histogram<T> {
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
if let Ok(b) = tracker.buckets.lock() {
if let Ok(b) = tracker.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bounds: self.value_map.config.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
sum: if self.value_map.config.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
min: if self.value_map.config.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
max: if self.value_map.config.record_min_max {
Some(b.max)
} else {
None
Expand Down
Loading
Loading