Skip to content

Commit

Permalink
Merge branch 'main' into http-json-example
Browse files Browse the repository at this point in the history
  • Loading branch information
ramgdev committed Jun 12, 2024
2 parents 6580f3e + b933bdd commit c51d0eb
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ receivers:
http:

exporters:
logging:
loglevel: debug
debug:
verbosity: detailed

service:
pipelines:
traces:
receivers: [otlp]
exporters: [logging]
exporters: [debug]
metrics:
receivers: [otlp]
exporters: [logging]
exporters: [debug]
logs:
receivers: [otlp]
exporters: [logging]
exporters: [debug]
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ receivers:
http:

exporters:
logging:
loglevel: debug
debug:
verbosity: detailed

service:
pipelines:
traces:
receivers: [otlp]
exporters: [logging]
exporters: [debug]
metrics:
receivers: [otlp]
exporters: [logging]
exporters: [debug]
logs:
receivers: [otlp]
exporters: [logging]
exporters: [debug]
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ receivers:
http:

exporters:
logging:
loglevel: debug
file:
path: /testresults/result.json

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {

fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
// If the provider is shutdown, new logger will refer a no-op logger provider.
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
if self.is_shutdown.load(Ordering::Relaxed) {
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
}
Logger::new(library, self.clone())
Expand Down
181 changes: 164 additions & 17 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,37 @@
//! not duplicate this data to avoid that different [`Tracer`] instances
//! of the [`TracerProvider`] have different versions of these data.
use crate::runtime::RuntimeChannel;
use crate::trace::{BatchSpanProcessor, SimpleSpanProcessor, Tracer};
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
};
use crate::{export::trace::SpanExporter, trace::SpanProcessor};
use crate::{InstrumentationLibrary, Resource};
use once_cell::sync::OnceCell;
use once_cell::sync::{Lazy, OnceCell};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, trace::TraceResult};
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

/// Default tracer name if empty string is provided.
const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/tracer";
static PROVIDER_RESOURCE: OnceCell<Resource> = OnceCell::new();

// a no nop tracer provider used as placeholder when the provider is shutdown
static NOOP_TRACER_PROVIDER: Lazy<TracerProvider> = Lazy::new(|| TracerProvider {
inner: Arc::new(TracerProviderInner {
processors: Vec::new(),
config: Config {
// cannot use default here as the default resource is not empty
sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
id_generator: Box::<RandomIdGenerator>::default(),
span_limits: SpanLimits::default(),
resource: Cow::Owned(Resource::empty()),
},
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

/// TracerProvider inner type
#[derive(Debug)]
pub(crate) struct TracerProviderInner {
Expand All @@ -39,9 +58,14 @@ impl Drop for TracerProviderInner {
}

/// Creator and registry of named [`Tracer`] instances.
///
/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components.
/// Cloning and dropping them will not stop the span processing. To stop span processing, users
/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`.
#[derive(Clone, Debug)]
pub struct TracerProvider {
inner: Arc<TracerProviderInner>,
is_shutdown: Arc<AtomicBool>,
}

impl Default for TracerProvider {
Expand All @@ -52,8 +76,11 @@ impl Default for TracerProvider {

impl TracerProvider {
/// Build a new tracer provider
pub(crate) fn new(inner: Arc<TracerProviderInner>) -> Self {
TracerProvider { inner }
pub(crate) fn new(inner: TracerProviderInner) -> Self {
TracerProvider {
inner: Arc::new(inner),
is_shutdown: Arc::new(AtomicBool::new(false)),
}
}

/// Create a new [`TracerProvider`] builder.
Expand All @@ -71,6 +98,12 @@ impl TracerProvider {
&self.inner.config
}

/// true if the provider has been shutdown
/// Don't start span or export spans when provider is shutdown
pub(crate) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::Relaxed)
}

/// Force flush all remaining spans in span processors and return results.
///
/// # Examples
Expand Down Expand Up @@ -114,11 +147,41 @@ impl TracerProvider {
.map(|processor| processor.force_flush())
.collect()
}

/// Shuts down the current `TracerProvider`.
///
/// Note that shut down doesn't means the TracerProvider has dropped
pub fn shutdown(&self) -> TraceResult<()> {
if self
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
// propagate the shutdown signal to processors
// it's up to the processor to properly block new spans after shutdown
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
errs.push(err);
}
}

if errs.is_empty() {
Ok(())
} else {
Err(TraceError::Other(format!("{errs:?}").into()))
}
} else {
Err(TraceError::Other(
"tracer provider already shut down".into(),
))
}
}
}

impl opentelemetry::trace::TracerProvider for TracerProvider {
/// This implementation of `TracerProvider` produces `Tracer` instances.
type Tracer = crate::trace::Tracer;
type Tracer = Tracer;

/// Create a new versioned `Tracer` instance.
fn versioned_tracer(
Expand Down Expand Up @@ -152,7 +215,10 @@ impl opentelemetry::trace::TracerProvider for TracerProvider {
}

fn library_tracer(&self, library: Arc<InstrumentationLibrary>) -> Self::Tracer {
Tracer::new(library, Arc::downgrade(&self.inner))
if self.is_shutdown.load(Ordering::Relaxed) {
return Tracer::new(library, NOOP_TRACER_PROVIDER.clone());
}
Tracer::new(library, self.clone())
}
}

Expand Down Expand Up @@ -226,9 +292,7 @@ impl Builder {
p.set_resource(config.resource.as_ref());
}

TracerProvider {
inner: Arc::new(TracerProviderInner { processors, config }),
}
TracerProvider::new(TracerProviderInner { processors, config })
}
}

Expand All @@ -241,24 +305,59 @@ mod tests {
use crate::trace::provider::TracerProviderInner;
use crate::trace::{Config, Span, SpanProcessor};
use crate::Resource;
use opentelemetry::trace::{TraceError, TraceResult};
use opentelemetry::trace::{TraceError, TraceResult, Tracer, TracerProvider};
use opentelemetry::{Context, Key, KeyValue, Value};
use std::borrow::Cow;
use std::env;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;

// fields below is wrapped with Arc so we can assert it
#[derive(Default, Debug)]
struct AssertInfo {
started_span: AtomicU32,
is_shutdown: AtomicBool,
}

#[derive(Default, Debug, Clone)]
struct SharedAssertInfo(Arc<AssertInfo>);

impl SharedAssertInfo {
fn started_span_count(&self, count: u32) -> bool {
self.0.started_span.load(Ordering::SeqCst) == count
}
}

#[derive(Debug)]
struct TestSpanProcessor {
success: bool,
assert_info: SharedAssertInfo,
}

impl TestSpanProcessor {
fn new(success: bool) -> TestSpanProcessor {
TestSpanProcessor {
success,
assert_info: SharedAssertInfo::default(),
}
}

// get handle to assert info
fn assert_info(&self) -> SharedAssertInfo {
self.assert_info.clone()
}
}

impl SpanProcessor for TestSpanProcessor {
fn on_start(&self, _span: &mut Span, _cx: &Context) {
unimplemented!()
self.assert_info
.0
.started_span
.fetch_add(1, Ordering::SeqCst);
}

fn on_end(&self, _span: SpanData) {
unimplemented!()
// ignore
}

fn force_flush(&self) -> TraceResult<()> {
Expand All @@ -270,19 +369,29 @@ mod tests {
}

fn shutdown(&self) -> TraceResult<()> {
self.force_flush()
if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
Ok(())
} else {
let _ = self.assert_info.0.is_shutdown.compare_exchange(
false,
true,
Ordering::SeqCst,
Ordering::SeqCst,
);
self.force_flush()
}
}
}

#[test]
fn test_force_flush() {
let tracer_provider = super::TracerProvider::new(Arc::from(TracerProviderInner {
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
processors: vec![
Box::from(TestSpanProcessor { success: true }),
Box::from(TestSpanProcessor { success: false }),
Box::from(TestSpanProcessor::new(true)),
Box::from(TestSpanProcessor::new(false)),
],
config: Default::default(),
}));
});

let results = tracer_provider.force_flush();
assert_eq!(results.len(), 2);
Expand Down Expand Up @@ -417,4 +526,42 @@ mod tests {

assert_eq!(no_service_name.config().resource.len(), 0)
}

#[test]
fn test_shutdown_noops() {
let processor = TestSpanProcessor::new(false);
let assert_handle = processor.assert_info();
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
processors: vec![Box::from(processor)],
config: Default::default(),
});

let test_tracer_1 = tracer_provider.tracer("test1");
let _ = test_tracer_1.start("test");

assert!(assert_handle.started_span_count(1));

let _ = test_tracer_1.start("test");

assert!(assert_handle.started_span_count(2));

let shutdown = |tracer_provider: super::TracerProvider| {
let _ = tracer_provider.shutdown(); // shutdown once
};

// assert tracer provider can be shutdown using on a cloned version
shutdown(tracer_provider.clone());

// after shutdown we should get noop tracer
let noop_tracer = tracer_provider.tracer("noop");
// noop tracer cannot start anything
let _ = noop_tracer.start("test");
assert!(assert_handle.started_span_count(2));
// noop tracer's tracer provider should be shutdown
assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst));

// existing tracer becomes noops after shutdown
let _ = test_tracer_1.start("test");
assert!(assert_handle.started_span_count(2));
}
}
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ impl Span {
None => return,
};

let provider = self.tracer.provider();
// skip if provider has been shut down
let provider = match self.tracer.provider() {
Some(provider) => provider,
None => return,
};
if provider.is_shutdown() {
return;
}

// ensure end time is set via explicit end or implicitly on drop
if let Some(timestamp) = timestamp {
Expand Down Expand Up @@ -719,7 +719,7 @@ mod tests {
let exported_data = span.exported_data();
assert!(exported_data.is_some());

drop(provider);
provider.shutdown().expect("shutdown panicked");
let dropped_span = tracer.start("span_with_dropped_provider");
// return none if the provider has already been dropped
assert!(dropped_span.exported_data().is_none());
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn force_flush(&self) -> TraceResult<()>;
/// Shuts down the processor. Called when SDK is shut down. This is an
/// opportunity for processors to do any cleanup required.
///
/// Implementation should make sure shutdown can be called multiple times.
fn shutdown(&self) -> TraceResult<()>;
/// Set the resource for the log processor.
fn set_resource(&mut self, _resource: &Resource) {}
Expand Down
Loading

0 comments on commit c51d0eb

Please sign in to comment.