Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metrics dedup/sort bug #2093

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{sync::Mutex, time::SystemTime};

use crate::metrics::data::HistogramDataPoint;
use crate::metrics::data::{self, Aggregation, Temporality};
use crate::metrics::KeyValueHelper;
use opentelemetry::KeyValue;

use super::Number;
Expand Down Expand Up @@ -207,7 +208,7 @@ impl<T: Number> Histogram<T> {
if seen.insert(Arc::as_ptr(&tracker)) {
if let Ok(b) = tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
attributes: KeyValueHelper::dedup_and_sort_attributes(&attrs),
start_time: start,
time: t,
count: b.count,
Expand Down Expand Up @@ -320,7 +321,7 @@ impl<T: Number> Histogram<T> {
if seen.insert(Arc::as_ptr(tracker)) {
if let Ok(b) = tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
attributes: KeyValueHelper::dedup_and_sort_attributes(attrs),
start_time: start,
time: t,
count: b.count,
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
time::SystemTime,
};

use crate::metrics::data::DataPoint;
use crate::metrics::{data::DataPoint, KeyValueHelper};
use opentelemetry::KeyValue;

use super::{Assign, AtomicTracker, Number, ValueMap};
Expand Down Expand Up @@ -63,7 +63,7 @@ impl<T: Number> LastValue<T> {
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(DataPoint {
attributes: attrs.clone(),
attributes: KeyValueHelper::dedup_and_sort_attributes(&attrs),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
Expand Down Expand Up @@ -115,7 +115,7 @@ impl<T: Number> LastValue<T> {
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(DataPoint {
attributes: attrs.clone(),
attributes: KeyValueHelper::dedup_and_sort_attributes(attrs),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
Expand Down
9 changes: 6 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use opentelemetry::KeyValue;

use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use crate::metrics::{
data::{self, Aggregation, DataPoint, Temporality},
KeyValueHelper,
};

use super::{Assign, AtomicTracker, Number, ValueMap};
use std::{
Expand Down Expand Up @@ -98,7 +101,7 @@ impl<T: Number> PrecomputedSum<T> {
let delta = value - *reported.get(&attrs).unwrap_or(&T::default());
new_reported.insert(attrs.clone(), value);
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
attributes: KeyValueHelper::dedup_and_sort_attributes(&attrs),
start_time: Some(prev_start),
time: Some(t),
value: delta,
Expand Down Expand Up @@ -176,7 +179,7 @@ impl<T: Number> PrecomputedSum<T> {
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
attributes: KeyValueHelper::dedup_and_sort_attributes(attrs),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::vec;
use std::{sync::Mutex, time::SystemTime};

use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use crate::metrics::KeyValueHelper;
use opentelemetry::KeyValue;

use super::{AtomicTracker, Number};
Expand Down Expand Up @@ -90,7 +91,7 @@ impl<T: Number> Sum<T> {
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
attributes: KeyValueHelper::dedup_and_sort_attributes(&attrs),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
Expand Down Expand Up @@ -170,7 +171,7 @@ impl<T: Number> Sum<T> {
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
attributes: KeyValueHelper::dedup_and_sort_attributes(attrs),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
Expand Down
127 changes: 121 additions & 6 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,47 @@
}
}

#[allow(dead_code)]
pub(crate) struct KeyValueHelper;

impl KeyValueHelper {
#[allow(dead_code)]
pub(crate) fn dedup_and_sort_attributes(attributes: &[KeyValue]) -> Vec<KeyValue> {
// Check if the attributes are already deduped
let mut has_duplicates = false;
let mut keys_set: HashSet<Key> = HashSet::with_capacity(attributes.len());
for kv in attributes {
if !keys_set.insert(kv.key.clone()) {
has_duplicates = true;
break;
}
}

if has_duplicates {
// Dedup the attributes and sort them
keys_set.clear();
let mut vec = attributes
.iter()
.rev()
.filter_map(|kv| {
if keys_set.insert(kv.key.clone()) {
Some(kv.clone())
} else {
None
}
})
.collect::<Vec<_>>();
vec.sort_unstable();
vec
} else {
// Attributes are already deduped
let mut vec = attributes.to_vec();
vec.sort_unstable();
vec
}
}
}
Comment on lines +128 to +167
Copy link
Contributor

@TommyCpp TommyCpp Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define this as a trait? It's easier to use IMO as you can just do attr.dedup_and_sort_attributes(). I asked chatGPT to write a simple example here


#[cfg(all(test, feature = "testing"))]
mod tests {
use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};
Expand Down Expand Up @@ -245,6 +286,14 @@
counter_aggregation_attribute_order_helper(Temporality::Cumulative, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_aggregation_duplicate_attribute_keys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't test actual issue (it succeeds on main every single time, even though main has a bug).

I have wrote a test that actually fails on main everytime* (has 1% chance to succeed, due to Hasher being unpredictable), but succeeds on your PR.
So your fix works :)

Here's the tests content.

    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
    async fn aggregation_attribute_keys_are_deduplicated_and_sorted() {
        // Run this test with stdout enabled to see output.
        // cargo test counter_aggregation_duplicate_attribute_keys --features=testing -- --nocapture

        // attributes are stored in HashMap, but default Hasher has unpredictable order everytime
        // to reduce test flakiness create many combinations
        for combination in 0..10 {
            aggregation_attribute_keys_are_deduplicated_and_sorted_helper(combination, Temporality::Delta);
            aggregation_attribute_keys_are_deduplicated_and_sorted_helper(
                combination,
                Temporality::Cumulative,
            );
        }
    }

    fn aggregation_attribute_keys_are_deduplicated_and_sorted_helper(
        combination: i32,
        temporality: Temporality,
    ) {
        // Arrange
        let mut test_context = TestContext::new(temporality);
        let counter = test_context.u64_counter("test", "my_counter", None);

        // duplicate zero(0)
        let keys = vec![0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
        let mut rng = thread_rng();

        // create 10 measurement with shuffled keys
        for _ in 0..10 {
            let mut shuffled = keys.clone();
            shuffled.shuffle(&mut rng);
            let shuffled: Vec<_> = shuffled
                .into_iter()
                .scan(false, |zero_seen, value| {
                    // make sure that for duplicated key (value=0) first time it will be 100, last value will be 0
                    Some(KeyValue::new(
                        format!("k{combination}.{value}"),
                        if value == 0 {
                            if *zero_seen {
                                // collector should always return last value (0)
                                value
                            } else {
                                // for zero value, if we see it for the first time, set it to 100,
                                // last value will always be 0,
                                *zero_seen = true;
                                100
                            }
                        } else {
                            value
                        },
                    ))
                })
                .collect();
            counter.add(1, &shuffled);
        }

        test_context.flush_metrics();

        // Assert
        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
        assert_eq!(sum.data_points.len(), 1);
        
        let expected: Vec<_> = (0..10)
            .into_iter()
            .map(|v| KeyValue::new(format!("k{combination}.{v}"), v))
            .collect();
        let dp = sum.data_points.iter().next().unwrap();
        assert_eq!(dp.value, 10);
        assert_eq!(dp.attributes, expected);
    }

Copy link
Contributor Author

@utpilla utpilla Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't test actual issue (it succeeds on main every single time, even though main has a bug).

The test does fail on main. I suspect that you ran the test without updating fn find_datapoint_with_key_value<'a, T>.

// Run this test with stdout enabled to see output.
// cargo test counter_aggregation_duplicate_attribute_keys --features=testing -- --nocapture
counter_aggregation_duplicate_attribute_keys_helper(Temporality::Delta);
counter_aggregation_duplicate_attribute_keys_helper(Temporality::Cumulative);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn histogram_aggregation_cumulative() {
// Run this test with stdout enabled to see output.
Expand Down Expand Up @@ -1987,6 +2036,46 @@
}
}

fn counter_aggregation_duplicate_attribute_keys_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
let counter = test_context.u64_counter("test", "my_counter", None);

let attributes = [
KeyValue::new("key1", "value1"),
KeyValue::new("key1", "value2"),
];

// Act
counter.add(1, &attributes);
counter.add(1, &attributes);
counter.add(1, &attributes);
counter.add(1, &attributes);
counter.add(1, &attributes);

test_context.flush_metrics();

// Assert
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 1);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"

Check warning on line 2067 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L2067

Added line #L2067 was not covered by tests
);
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}

// find and validate key1=value1 datapoint
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
.expect("datapoint with key1=value2 expected");
assert_eq!(data_point.value, 5);
}

fn counter_aggregation_overflow_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
Expand Down Expand Up @@ -2195,12 +2284,38 @@
key: &str,
value: &str,
) -> Option<&'a DataPoint<T>> {
data_points.iter().find(|&datapoint| {
datapoint
.attributes
.iter()
.any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
})
for data_point in data_points {
let mut count = 0;
let mut result: Option<&'a DataPoint<T>> = None;
for kv in &data_point.attributes {
if kv.key.as_str() == key {
count += 1;

if kv.value.as_str() == value {
result = Some(data_point);
}
}
}

match count {
// The input key was not found in the attributes of this DataPoint.
0 => {}

// Return the result only if the key was found exactly once in the attributes of the DataPoint and the value also matched.
1 => {
if result.is_some() {
return result;
}
}

_ => panic!(
"Found more than one occurence of key={} within the same DataPoint",
key
),

Check warning on line 2314 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L2311-L2314

Added lines #L2311 - L2314 were not covered by tests
}
}

None

Check warning on line 2318 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L2318

Added line #L2318 was not covered by tests
}

fn find_datapoint_with_no_attributes<T>(data_points: &[DataPoint<T>]) -> Option<&DataPoint<T>> {
Expand Down
Loading