-
Notifications
You must be signed in to change notification settings - Fork 440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix metrics dedup/sort bug #2093
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,6 +125,47 @@ | |
} | ||
} | ||
|
||
#[allow(dead_code)] | ||
pub(crate) struct KeyValueHelper; | ||
|
||
impl KeyValueHelper { | ||
#[allow(dead_code)] | ||
pub(crate) fn dedup_and_sort_attributes(attributes: &[KeyValue]) -> Vec<KeyValue> { | ||
// Check if the attributes are already deduped | ||
let mut has_duplicates = false; | ||
let mut keys_set: HashSet<Key> = HashSet::with_capacity(attributes.len()); | ||
for kv in attributes { | ||
if !keys_set.insert(kv.key.clone()) { | ||
has_duplicates = true; | ||
break; | ||
} | ||
} | ||
|
||
if has_duplicates { | ||
// Dedup the attributes and sort them | ||
keys_set.clear(); | ||
let mut vec = attributes | ||
.iter() | ||
.rev() | ||
.filter_map(|kv| { | ||
if keys_set.insert(kv.key.clone()) { | ||
Some(kv.clone()) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
vec.sort_unstable(); | ||
vec | ||
} else { | ||
// Attributes are already deduped | ||
let mut vec = attributes.to_vec(); | ||
vec.sort_unstable(); | ||
vec | ||
} | ||
} | ||
} | ||
|
||
#[cfg(all(test, feature = "testing"))] | ||
mod tests { | ||
use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics}; | ||
|
@@ -245,6 +286,14 @@ | |
counter_aggregation_attribute_order_helper(Temporality::Cumulative, false); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
async fn counter_aggregation_duplicate_attribute_keys() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test doesn't test actual issue (it succeeds on I have wrote a test that actually fails on main everytime* (has 1% chance to succeed, due to Hasher being unpredictable), but succeeds on your PR. Here's the tests content. #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn aggregation_attribute_keys_are_deduplicated_and_sorted() {
// Run this test with stdout enabled to see output.
// cargo test counter_aggregation_duplicate_attribute_keys --features=testing -- --nocapture
// attributes are stored in HashMap, but default Hasher has unpredictable order everytime
// to reduce test flakiness create many combinations
for combination in 0..10 {
aggregation_attribute_keys_are_deduplicated_and_sorted_helper(combination, Temporality::Delta);
aggregation_attribute_keys_are_deduplicated_and_sorted_helper(
combination,
Temporality::Cumulative,
);
}
}
fn aggregation_attribute_keys_are_deduplicated_and_sorted_helper(
combination: i32,
temporality: Temporality,
) {
// Arrange
let mut test_context = TestContext::new(temporality);
let counter = test_context.u64_counter("test", "my_counter", None);
// duplicate zero(0)
let keys = vec![0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let mut rng = thread_rng();
// create 10 measurement with shuffled keys
for _ in 0..10 {
let mut shuffled = keys.clone();
shuffled.shuffle(&mut rng);
let shuffled: Vec<_> = shuffled
.into_iter()
.scan(false, |zero_seen, value| {
// make sure that for duplicated key (value=0) first time it will be 100, last value will be 0
Some(KeyValue::new(
format!("k{combination}.{value}"),
if value == 0 {
if *zero_seen {
// collector should always return last value (0)
value
} else {
// for zero value, if we see it for the first time, set it to 100,
// last value will always be 0,
*zero_seen = true;
100
}
} else {
value
},
))
})
.collect();
counter.add(1, &shuffled);
}
test_context.flush_metrics();
// Assert
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
assert_eq!(sum.data_points.len(), 1);
let expected: Vec<_> = (0..10)
.into_iter()
.map(|v| KeyValue::new(format!("k{combination}.{v}"), v))
.collect();
let dp = sum.data_points.iter().next().unwrap();
assert_eq!(dp.value, 10);
assert_eq!(dp.attributes, expected);
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The test does fail on main. I suspect that you ran the test without updating |
||
// Run this test with stdout enabled to see output. | ||
// cargo test counter_aggregation_duplicate_attribute_keys --features=testing -- --nocapture | ||
counter_aggregation_duplicate_attribute_keys_helper(Temporality::Delta); | ||
counter_aggregation_duplicate_attribute_keys_helper(Temporality::Cumulative); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
async fn histogram_aggregation_cumulative() { | ||
// Run this test with stdout enabled to see output. | ||
|
@@ -1987,6 +2036,46 @@ | |
} | ||
} | ||
|
||
fn counter_aggregation_duplicate_attribute_keys_helper(temporality: Temporality) { | ||
// Arrange | ||
let mut test_context = TestContext::new(temporality); | ||
let counter = test_context.u64_counter("test", "my_counter", None); | ||
|
||
let attributes = [ | ||
KeyValue::new("key1", "value1"), | ||
KeyValue::new("key1", "value2"), | ||
]; | ||
|
||
// Act | ||
counter.add(1, &attributes); | ||
counter.add(1, &attributes); | ||
counter.add(1, &attributes); | ||
counter.add(1, &attributes); | ||
counter.add(1, &attributes); | ||
|
||
test_context.flush_metrics(); | ||
|
||
// Assert | ||
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None); | ||
// Expecting 2 time-series. | ||
assert_eq!(sum.data_points.len(), 1); | ||
assert!(sum.is_monotonic, "Counter should produce monotonic."); | ||
if let Temporality::Cumulative = temporality { | ||
assert_eq!( | ||
sum.temporality, | ||
Temporality::Cumulative, | ||
"Should produce cumulative" | ||
); | ||
} else { | ||
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); | ||
} | ||
|
||
// find and validate key1=value1 datapoint | ||
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value2") | ||
.expect("datapoint with key1=value2 expected"); | ||
assert_eq!(data_point.value, 5); | ||
} | ||
|
||
fn counter_aggregation_overflow_helper(temporality: Temporality) { | ||
// Arrange | ||
let mut test_context = TestContext::new(temporality); | ||
|
@@ -2195,12 +2284,38 @@ | |
key: &str, | ||
value: &str, | ||
) -> Option<&'a DataPoint<T>> { | ||
data_points.iter().find(|&datapoint| { | ||
datapoint | ||
.attributes | ||
.iter() | ||
.any(|kv| kv.key.as_str() == key && kv.value.as_str() == value) | ||
}) | ||
for data_point in data_points { | ||
let mut count = 0; | ||
let mut result: Option<&'a DataPoint<T>> = None; | ||
for kv in &data_point.attributes { | ||
if kv.key.as_str() == key { | ||
count += 1; | ||
|
||
if kv.value.as_str() == value { | ||
result = Some(data_point); | ||
} | ||
} | ||
} | ||
|
||
match count { | ||
// The input key was not found in the attributes of this DataPoint. | ||
0 => {} | ||
|
||
// Return the result only if the key was found exactly once in the attributes of the DataPoint and the value also matched. | ||
1 => { | ||
if result.is_some() { | ||
return result; | ||
} | ||
} | ||
|
||
_ => panic!( | ||
"Found more than one occurence of key={} within the same DataPoint", | ||
key | ||
), | ||
} | ||
} | ||
|
||
None | ||
} | ||
|
||
fn find_datapoint_with_no_attributes<T>(data_points: &[DataPoint<T>]) -> Option<&DataPoint<T>> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define this as a trait? It's easier to use IMO as you can just do
attr.dedup_and_sort_attributes()
. I asked chatGPT to write a simple example here