diff --git a/opentelemetry-otlp/examples/basic-otlp-http/otel-collector-config.yaml b/opentelemetry-otlp/examples/basic-otlp-http/otel-collector-config.yaml index 0735e7c69a..1c6d258426 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/otel-collector-config.yaml +++ b/opentelemetry-otlp/examples/basic-otlp-http/otel-collector-config.yaml @@ -11,17 +11,17 @@ receivers: http: exporters: - logging: - loglevel: debug + debug: + verbosity: detailed service: pipelines: traces: receivers: [otlp] - exporters: [logging] + exporters: [debug] metrics: receivers: [otlp] - exporters: [logging] + exporters: [debug] logs: receivers: [otlp] - exporters: [logging] \ No newline at end of file + exporters: [debug] \ No newline at end of file diff --git a/opentelemetry-otlp/examples/basic-otlp/otel-collector-config.yaml b/opentelemetry-otlp/examples/basic-otlp/otel-collector-config.yaml index 54e90b8123..51ef89550f 100644 --- a/opentelemetry-otlp/examples/basic-otlp/otel-collector-config.yaml +++ b/opentelemetry-otlp/examples/basic-otlp/otel-collector-config.yaml @@ -11,17 +11,17 @@ receivers: http: exporters: - logging: - loglevel: debug + debug: + verbosity: detailed service: pipelines: traces: receivers: [otlp] - exporters: [logging] + exporters: [debug] metrics: receivers: [otlp] - exporters: [logging] + exporters: [debug] logs: receivers: [otlp] - exporters: [logging] + exporters: [debug] diff --git a/opentelemetry-otlp/tests/integration_test/otel-collector-config.yaml b/opentelemetry-otlp/tests/integration_test/otel-collector-config.yaml index 39d755cc9a..2c5a321993 100644 --- a/opentelemetry-otlp/tests/integration_test/otel-collector-config.yaml +++ b/opentelemetry-otlp/tests/integration_test/otel-collector-config.yaml @@ -5,8 +5,6 @@ receivers: http: exporters: - logging: - loglevel: debug file: path: /testresults/result.json diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 6ed216427e..22f5824b3a 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -77,7 +77,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider { fn library_logger(&self, library: Arc) -> Self::Logger { // If the provider is shutdown, new logger will refer a no-op logger provider. - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + if self.is_shutdown.load(Ordering::Relaxed) { return Logger::new(library, NOOP_LOGGER_PROVIDER.clone()); } Logger::new(library, self.clone()) diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 701a784f90..9550ce11d2 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -9,18 +9,37 @@ //! not duplicate this data to avoid that different [`Tracer`] instances //! of the [`TracerProvider`] have different versions of these data. use crate::runtime::RuntimeChannel; -use crate::trace::{BatchSpanProcessor, SimpleSpanProcessor, Tracer}; +use crate::trace::{ + BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer, +}; use crate::{export::trace::SpanExporter, trace::SpanProcessor}; use crate::{InstrumentationLibrary, Resource}; -use once_cell::sync::OnceCell; +use once_cell::sync::{Lazy, OnceCell}; +use opentelemetry::trace::TraceError; use opentelemetry::{global, trace::TraceResult}; use std::borrow::Cow; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; /// Default tracer name if empty string is provided. const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/tracer"; static PROVIDER_RESOURCE: OnceCell = OnceCell::new(); +// a no nop tracer provider used as placeholder when the provider is shutdown +static NOOP_TRACER_PROVIDER: Lazy = Lazy::new(|| TracerProvider { + inner: Arc::new(TracerProviderInner { + processors: Vec::new(), + config: Config { + // cannot use default here as the default resource is not empty + sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))), + id_generator: Box::::default(), + span_limits: SpanLimits::default(), + resource: Cow::Owned(Resource::empty()), + }, + }), + is_shutdown: Arc::new(AtomicBool::new(true)), +}); + /// TracerProvider inner type #[derive(Debug)] pub(crate) struct TracerProviderInner { @@ -39,9 +58,14 @@ impl Drop for TracerProviderInner { } /// Creator and registry of named [`Tracer`] instances. +/// +/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components. +/// Cloning and dropping them will not stop the span processing. To stop span processing, users +/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`. #[derive(Clone, Debug)] pub struct TracerProvider { inner: Arc, + is_shutdown: Arc, } impl Default for TracerProvider { @@ -52,8 +76,11 @@ impl Default for TracerProvider { impl TracerProvider { /// Build a new tracer provider - pub(crate) fn new(inner: Arc) -> Self { - TracerProvider { inner } + pub(crate) fn new(inner: TracerProviderInner) -> Self { + TracerProvider { + inner: Arc::new(inner), + is_shutdown: Arc::new(AtomicBool::new(false)), + } } /// Create a new [`TracerProvider`] builder. @@ -71,6 +98,12 @@ impl TracerProvider { &self.inner.config } + /// true if the provider has been shutdown + /// Don't start span or export spans when provider is shutdown + pub(crate) fn is_shutdown(&self) -> bool { + self.is_shutdown.load(Ordering::Relaxed) + } + /// Force flush all remaining spans in span processors and return results. /// /// # Examples @@ -114,11 +147,41 @@ impl TracerProvider { .map(|processor| processor.force_flush()) .collect() } + + /// Shuts down the current `TracerProvider`. + /// + /// Note that shut down doesn't means the TracerProvider has dropped + pub fn shutdown(&self) -> TraceResult<()> { + if self + .is_shutdown + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + // propagate the shutdown signal to processors + // it's up to the processor to properly block new spans after shutdown + let mut errs = vec![]; + for processor in &self.inner.processors { + if let Err(err) = processor.shutdown() { + errs.push(err); + } + } + + if errs.is_empty() { + Ok(()) + } else { + Err(TraceError::Other(format!("{errs:?}").into())) + } + } else { + Err(TraceError::Other( + "tracer provider already shut down".into(), + )) + } + } } impl opentelemetry::trace::TracerProvider for TracerProvider { /// This implementation of `TracerProvider` produces `Tracer` instances. - type Tracer = crate::trace::Tracer; + type Tracer = Tracer; /// Create a new versioned `Tracer` instance. fn versioned_tracer( @@ -152,7 +215,10 @@ impl opentelemetry::trace::TracerProvider for TracerProvider { } fn library_tracer(&self, library: Arc) -> Self::Tracer { - Tracer::new(library, Arc::downgrade(&self.inner)) + if self.is_shutdown.load(Ordering::Relaxed) { + return Tracer::new(library, NOOP_TRACER_PROVIDER.clone()); + } + Tracer::new(library, self.clone()) } } @@ -226,9 +292,7 @@ impl Builder { p.set_resource(config.resource.as_ref()); } - TracerProvider { - inner: Arc::new(TracerProviderInner { processors, config }), - } + TracerProvider::new(TracerProviderInner { processors, config }) } } @@ -241,24 +305,59 @@ mod tests { use crate::trace::provider::TracerProviderInner; use crate::trace::{Config, Span, SpanProcessor}; use crate::Resource; - use opentelemetry::trace::{TraceError, TraceResult}; + use opentelemetry::trace::{TraceError, TraceResult, Tracer, TracerProvider}; use opentelemetry::{Context, Key, KeyValue, Value}; use std::borrow::Cow; use std::env; + use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; + // fields below is wrapped with Arc so we can assert it + #[derive(Default, Debug)] + struct AssertInfo { + started_span: AtomicU32, + is_shutdown: AtomicBool, + } + + #[derive(Default, Debug, Clone)] + struct SharedAssertInfo(Arc); + + impl SharedAssertInfo { + fn started_span_count(&self, count: u32) -> bool { + self.0.started_span.load(Ordering::SeqCst) == count + } + } + #[derive(Debug)] struct TestSpanProcessor { success: bool, + assert_info: SharedAssertInfo, + } + + impl TestSpanProcessor { + fn new(success: bool) -> TestSpanProcessor { + TestSpanProcessor { + success, + assert_info: SharedAssertInfo::default(), + } + } + + // get handle to assert info + fn assert_info(&self) -> SharedAssertInfo { + self.assert_info.clone() + } } impl SpanProcessor for TestSpanProcessor { fn on_start(&self, _span: &mut Span, _cx: &Context) { - unimplemented!() + self.assert_info + .0 + .started_span + .fetch_add(1, Ordering::SeqCst); } fn on_end(&self, _span: SpanData) { - unimplemented!() + // ignore } fn force_flush(&self) -> TraceResult<()> { @@ -270,19 +369,29 @@ mod tests { } fn shutdown(&self) -> TraceResult<()> { - self.force_flush() + if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) { + Ok(()) + } else { + let _ = self.assert_info.0.is_shutdown.compare_exchange( + false, + true, + Ordering::SeqCst, + Ordering::SeqCst, + ); + self.force_flush() + } } } #[test] fn test_force_flush() { - let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner { + let tracer_provider = super::TracerProvider::new(TracerProviderInner { processors: vec![ - Box::from(TestSpanProcessor { success: true }), - Box::from(TestSpanProcessor { success: false }), + Box::from(TestSpanProcessor::new(true)), + Box::from(TestSpanProcessor::new(false)), ], config: Default::default(), - })); + }); let results = tracer_provider.force_flush(); assert_eq!(results.len(), 2); @@ -417,4 +526,42 @@ mod tests { assert_eq!(no_service_name.config().resource.len(), 0) } + + #[test] + fn test_shutdown_noops() { + let processor = TestSpanProcessor::new(false); + let assert_handle = processor.assert_info(); + let tracer_provider = super::TracerProvider::new(TracerProviderInner { + processors: vec![Box::from(processor)], + config: Default::default(), + }); + + let test_tracer_1 = tracer_provider.tracer("test1"); + let _ = test_tracer_1.start("test"); + + assert!(assert_handle.started_span_count(1)); + + let _ = test_tracer_1.start("test"); + + assert!(assert_handle.started_span_count(2)); + + let shutdown = |tracer_provider: super::TracerProvider| { + let _ = tracer_provider.shutdown(); // shutdown once + }; + + // assert tracer provider can be shutdown using on a cloned version + shutdown(tracer_provider.clone()); + + // after shutdown we should get noop tracer + let noop_tracer = tracer_provider.tracer("noop"); + // noop tracer cannot start anything + let _ = noop_tracer.start("test"); + assert!(assert_handle.started_span_count(2)); + // noop tracer's tracer provider should be shutdown + assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst)); + + // existing tracer becomes noops after shutdown + let _ = test_tracer_1.start("test"); + assert!(assert_handle.started_span_count(2)); + } } diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index d672348885..ea03d9ab53 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -204,11 +204,11 @@ impl Span { None => return, }; + let provider = self.tracer.provider(); // skip if provider has been shut down - let provider = match self.tracer.provider() { - Some(provider) => provider, - None => return, - }; + if provider.is_shutdown() { + return; + } // ensure end time is set via explicit end or implicitly on drop if let Some(timestamp) = timestamp { @@ -719,7 +719,7 @@ mod tests { let exported_data = span.exported_data(); assert!(exported_data.is_some()); - drop(provider); + provider.shutdown().expect("shutdown panicked"); let dropped_span = tracer.start("span_with_dropped_provider"); // return none if the provider has already been dropped assert!(dropped_span.exported_data().is_none()); diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index f9b634e850..214c0e5768 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -92,6 +92,8 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { fn force_flush(&self) -> TraceResult<()>; /// Shuts down the processor. Called when SDK is shut down. This is an /// opportunity for processors to do any cleanup required. + /// + /// Implementation should make sure shutdown can be called multiple times. fn shutdown(&self) -> TraceResult<()>; /// Set the resource for the log processor. fn set_resource(&mut self, _resource: &Resource) {} diff --git a/opentelemetry-sdk/src/trace/tracer.rs b/opentelemetry-sdk/src/trace/tracer.rs index 0749937cd7..ea56725d15 100644 --- a/opentelemetry-sdk/src/trace/tracer.rs +++ b/opentelemetry-sdk/src/trace/tracer.rs @@ -9,7 +9,7 @@ //! Docs: use crate::{ trace::{ - provider::{TracerProvider, TracerProviderInner}, + provider::TracerProvider, span::{Span, SpanData}, SpanLimits, SpanLinks, }, @@ -20,7 +20,7 @@ use opentelemetry::{ Context, KeyValue, }; use std::fmt; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use super::SpanEvents; @@ -28,7 +28,7 @@ use super::SpanEvents; #[derive(Clone)] pub struct Tracer { instrumentation_lib: Arc, - provider: Weak, + provider: TracerProvider, } impl fmt::Debug for Tracer { @@ -46,7 +46,7 @@ impl Tracer { /// Create a new tracer (used internally by `TracerProvider`s). pub(crate) fn new( instrumentation_lib: Arc, - provider: Weak, + provider: TracerProvider, ) -> Self { Tracer { instrumentation_lib, @@ -55,8 +55,8 @@ impl Tracer { } /// TracerProvider associated with this tracer. - pub(crate) fn provider(&self) -> Option { - self.provider.upgrade().map(TracerProvider::new) + pub(crate) fn provider(&self) -> &TracerProvider { + &self.provider } /// Instrumentation library information of this tracer. @@ -175,7 +175,8 @@ impl opentelemetry::trace::Tracer for Tracer { /// spans in the trace. fn build_with_context(&self, mut builder: SpanBuilder, parent_cx: &Context) -> Self::Span { let provider = self.provider(); - if provider.is_none() { + // no point start a span if the tracer provider has already being shutdown + if provider.is_shutdown() { return Span::new( SpanContext::empty_context(), None, @@ -184,7 +185,6 @@ impl opentelemetry::trace::Tracer for Tracer { ); } - let provider = provider.unwrap(); let config = provider.config(); let span_id = builder .span_id