Skip to content

Commit

Permalink
Implement cardinality limits for metrics streams #1065
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Coenen <[email protected]>
  • Loading branch information
bnjjj committed Jun 18, 2024
1 parent da368d4 commit 166d217
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 41 deletions.
51 changes: 30 additions & 21 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
use std::{marker, sync::Arc};
use std::marker;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use once_cell::sync::Lazy;
use opentelemetry::KeyValue;

use crate::{
metrics::data::{Aggregation, Gauge, Temporality},
metrics::AttributeSet,
};

use super::{
exponential_histogram::ExpoHistogram,
histogram::Histogram,
last_value::LastValue,
sum::{PrecomputedSum, Sum},
Number,
};

const STREAM_CARDINALITY_LIMIT: u32 = 2000;
use super::exponential_histogram::ExpoHistogram;
use super::histogram::Histogram;
use super::last_value::LastValue;
use super::sum::PrecomputedSum;
use super::sum::Sum;
use super::Number;
use crate::metrics::data::Aggregation;
use crate::metrics::data::Gauge;
use crate::metrics::data::Temporality;
use crate::metrics::AttributeSet;

static STREAM_CARDINALITY_LIMIT: AtomicU64 = AtomicU64::new(2000);
pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy<AttributeSet> = Lazy::new(|| {
let key_values: [KeyValue; 1] = [KeyValue::new("otel.metric.overflow", "true")];
AttributeSet::from(&key_values[..])
});

/// Checks whether aggregator has hit cardinality limit for metric streams
pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
size < STREAM_CARDINALITY_LIMIT as usize
size < STREAM_CARDINALITY_LIMIT.load(std::sync::atomic::Ordering::Relaxed) as usize
}

/// Set cardinality limit for metric streams
pub fn set_stream_cardinality_limit(size: u64) {
STREAM_CARDINALITY_LIMIT.store(size, std::sync::atomic::Ordering::Relaxed)

Check warning on line 32 in opentelemetry-sdk/src/metrics/internal/aggregate.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/aggregate.rs#L31-L32

Added lines #L31 - L32 were not covered by tests
}

/// Receives measurements to be aggregated.
Expand Down Expand Up @@ -213,13 +218,17 @@ impl<T: Number<T>> AggregateBuilder<T> {

#[cfg(test)]
mod tests {
use crate::metrics::data::{
DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint,
Histogram, HistogramDataPoint, Sum,
};
use std::{time::SystemTime, vec};
use std::time::SystemTime;
use std::vec;

use super::*;
use crate::metrics::data::DataPoint;
use crate::metrics::data::ExponentialBucket;
use crate::metrics::data::ExponentialHistogram;
use crate::metrics::data::ExponentialHistogramDataPoint;
use crate::metrics::data::Histogram;
use crate::metrics::data::HistogramDataPoint;
use crate::metrics::data::Sum;

#[test]
fn last_value_aggregation() {
Expand Down
16 changes: 12 additions & 4 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@ mod last_value;
mod sum;

use core::fmt;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::ops::Add;
use std::ops::AddAssign;
use std::ops::Sub;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Mutex;

pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
pub use aggregate::set_stream_cardinality_limit;
pub(crate) use aggregate::AggregateBuilder;
pub(crate) use aggregate::ComputeAggregation;
pub(crate) use aggregate::Measure;
pub(crate) use exponential_histogram::EXPO_MAX_SCALE;
pub(crate) use exponential_histogram::EXPO_MIN_SCALE;

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
Expand Down
47 changes: 31 additions & 16 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,24 @@ pub(crate) mod pipeline;
pub mod reader;
pub(crate) mod view;

use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::Hash;
use std::hash::Hasher;

pub use aggregation::*;
pub use instrument::*;
pub use internal::set_stream_cardinality_limit;
pub use manual_reader::*;
pub use meter::*;
pub use meter_provider::*;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use opentelemetry::Value;
pub use periodic_reader::*;
pub use pipeline::Pipeline;
pub use view::*;

use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};

use opentelemetry::{Key, KeyValue, Value};

/// A unique set of attributes that can be used as instrument identifiers.
///
/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as
Expand Down Expand Up @@ -139,20 +142,32 @@ impl Hash for AttributeSet {

#[cfg(all(test, feature = "testing"))]
mod tests {
use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};
use super::*;
use crate::metrics::data::{ResourceMetrics, Temporality};
use crate::metrics::reader::TemporalitySelector;
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
use rand::{rngs, Rng, SeedableRng};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

use opentelemetry::metrics::Counter;
use opentelemetry::metrics::Meter;
use opentelemetry::metrics::MeterProvider as _;
use opentelemetry::metrics::UpDownCounter;
use opentelemetry::KeyValue;
use rand::rngs;
use rand::Rng;
use rand::SeedableRng;

use self::data::DataPoint;
use self::data::HistogramDataPoint;
use self::data::ScopeMetrics;
use super::*;
use crate::metrics::data::ResourceMetrics;
use crate::metrics::data::Temporality;
use crate::metrics::reader::TemporalitySelector;
use crate::runtime;
use crate::testing::metrics::InMemoryMetricsExporter;
use crate::testing::metrics::InMemoryMetricsExporterBuilder;

// Run all tests in this mod
// cargo test metrics::tests --features=testing
// Note for all tests from this point onwards in this mod:
Expand Down

0 comments on commit 166d217

Please sign in to comment.