From 166d21718759b84cea20b3c7cc59715ba40f4378 Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Tue, 18 Jun 2024 14:22:02 +0200 Subject: [PATCH] Implement cardinality limits for metrics streams #1065 Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- .../src/metrics/internal/aggregate.rs | 51 +++++++++++-------- opentelemetry-sdk/src/metrics/internal/mod.rs | 16 ++++-- opentelemetry-sdk/src/metrics/mod.rs | 47 +++++++++++------ 3 files changed, 73 insertions(+), 41 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 93fcc6a69c..6d7c91ee13 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -1,22 +1,22 @@ -use std::{marker, sync::Arc}; +use std::marker; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; use once_cell::sync::Lazy; use opentelemetry::KeyValue; -use crate::{ - metrics::data::{Aggregation, Gauge, Temporality}, - metrics::AttributeSet, -}; - -use super::{ - exponential_histogram::ExpoHistogram, - histogram::Histogram, - last_value::LastValue, - sum::{PrecomputedSum, Sum}, - Number, -}; - -const STREAM_CARDINALITY_LIMIT: u32 = 2000; +use super::exponential_histogram::ExpoHistogram; +use super::histogram::Histogram; +use super::last_value::LastValue; +use super::sum::PrecomputedSum; +use super::sum::Sum; +use super::Number; +use crate::metrics::data::Aggregation; +use crate::metrics::data::Gauge; +use crate::metrics::data::Temporality; +use crate::metrics::AttributeSet; + +static STREAM_CARDINALITY_LIMIT: AtomicU64 = AtomicU64::new(2000); pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy = Lazy::new(|| { let key_values: [KeyValue; 1] = [KeyValue::new("otel.metric.overflow", "true")]; AttributeSet::from(&key_values[..]) @@ -24,7 +24,12 @@ pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy = Lazy::new( /// Checks whether aggregator has hit cardinality limit for metric streams pub(crate) fn is_under_cardinality_limit(size: usize) -> bool { - size < STREAM_CARDINALITY_LIMIT as usize + size < STREAM_CARDINALITY_LIMIT.load(std::sync::atomic::Ordering::Relaxed) as usize +} + +/// Set cardinality limit for metric streams +pub fn set_stream_cardinality_limit(size: u64) { + STREAM_CARDINALITY_LIMIT.store(size, std::sync::atomic::Ordering::Relaxed) } /// Receives measurements to be aggregated. @@ -213,13 +218,17 @@ impl> AggregateBuilder { #[cfg(test)] mod tests { - use crate::metrics::data::{ - DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, - Histogram, HistogramDataPoint, Sum, - }; - use std::{time::SystemTime, vec}; + use std::time::SystemTime; + use std::vec; use super::*; + use crate::metrics::data::DataPoint; + use crate::metrics::data::ExponentialBucket; + use crate::metrics::data::ExponentialHistogram; + use crate::metrics::data::ExponentialHistogramDataPoint; + use crate::metrics::data::Histogram; + use crate::metrics::data::HistogramDataPoint; + use crate::metrics::data::Sum; #[test] fn last_value_aggregation() { diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 92bc3d947f..238aa955e2 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -5,12 +5,20 @@ mod last_value; mod sum; use core::fmt; -use std::ops::{Add, AddAssign, Sub}; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::ops::Add; +use std::ops::AddAssign; +use std::ops::Sub; +use std::sync::atomic::AtomicI64; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use std::sync::Mutex; -pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; -pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +pub use aggregate::set_stream_cardinality_limit; +pub(crate) use aggregate::AggregateBuilder; +pub(crate) use aggregate::ComputeAggregation; +pub(crate) use aggregate::Measure; +pub(crate) use exponential_histogram::EXPO_MAX_SCALE; +pub(crate) use exponential_histogram::EXPO_MIN_SCALE; /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 2c6b709fef..b49c0aef7d 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -52,21 +52,24 @@ pub(crate) mod pipeline; pub mod reader; pub(crate) mod view; +use std::collections::hash_map::DefaultHasher; +use std::collections::HashSet; +use std::hash::Hash; +use std::hash::Hasher; + pub use aggregation::*; pub use instrument::*; +pub use internal::set_stream_cardinality_limit; pub use manual_reader::*; pub use meter::*; pub use meter_provider::*; +use opentelemetry::Key; +use opentelemetry::KeyValue; +use opentelemetry::Value; pub use periodic_reader::*; pub use pipeline::Pipeline; pub use view::*; -use std::collections::hash_map::DefaultHasher; -use std::collections::HashSet; -use std::hash::{Hash, Hasher}; - -use opentelemetry::{Key, KeyValue, Value}; - /// A unique set of attributes that can be used as instrument identifiers. /// /// This must implement [Hash], [PartialEq], and [Eq] so it may be used as @@ -139,20 +142,32 @@ impl Hash for AttributeSet { #[cfg(all(test, feature = "testing"))] mod tests { - use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics}; - use super::*; - use crate::metrics::data::{ResourceMetrics, Temporality}; - use crate::metrics::reader::TemporalitySelector; - use crate::testing::metrics::InMemoryMetricsExporterBuilder; - use crate::{runtime, testing::metrics::InMemoryMetricsExporter}; - use opentelemetry::metrics::{Counter, Meter, UpDownCounter}; - use opentelemetry::{metrics::MeterProvider as _, KeyValue}; - use rand::{rngs, Rng, SeedableRng}; use std::borrow::Cow; - use std::sync::{Arc, Mutex}; + use std::sync::Arc; + use std::sync::Mutex; use std::thread; use std::time::Duration; + use opentelemetry::metrics::Counter; + use opentelemetry::metrics::Meter; + use opentelemetry::metrics::MeterProvider as _; + use opentelemetry::metrics::UpDownCounter; + use opentelemetry::KeyValue; + use rand::rngs; + use rand::Rng; + use rand::SeedableRng; + + use self::data::DataPoint; + use self::data::HistogramDataPoint; + use self::data::ScopeMetrics; + use super::*; + use crate::metrics::data::ResourceMetrics; + use crate::metrics::data::Temporality; + use crate::metrics::reader::TemporalitySelector; + use crate::runtime; + use crate::testing::metrics::InMemoryMetricsExporter; + use crate::testing::metrics::InMemoryMetricsExporterBuilder; + // Run all tests in this mod // cargo test metrics::tests --features=testing // Note for all tests from this point onwards in this mod: