Skip to content

Commit

Permalink
set_resource status, and call synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Jun 27, 2024
1 parent 4df74e8 commit 161f087
Show file tree
Hide file tree
Showing 16 changed files with 116 additions and 50 deletions.
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ impl LogExporter for OtlpHttpClient {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> LogResult<()> {

Check warning on line 60 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L60

Added line #L60 was not covered by tests
self.resource = resource.into();
Ok(())

Check warning on line 62 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L62

Added line #L62 was not covered by tests
}
}
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ impl SpanExporter for OtlpHttpClient {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {

Check warning on line 70 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L70

Added line #L70 was not covered by tests
self.resource = resource.into();
Ok(())

Check warning on line 72 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L72

Added line #L72 was not covered by tests
}
}
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ impl LogExporter for TonicLogsClient {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> LogResult<()> {

Check warning on line 94 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L94

Added line #L94 was not covered by tests
self.resource = resource.into();
Ok(())

Check warning on line 96 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L96

Added line #L96 was not covered by tests
}
}
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ impl SpanExporter for TonicTracesClient {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {
self.resource = resource.into();
Ok(())
}
}
7 changes: 5 additions & 2 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
self.client.export(batch).await
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.client.set_resource(resource);
fn set_resource(
&mut self,
resource: &opentelemetry_sdk::Resource,
) -> opentelemetry::logs::LogResult<()> {
self.client.set_resource(resource)

Check warning on line 112 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L108-L112

Added lines #L108 - L112 were not covered by tests
}
}

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
self.0.export(batch)
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.0.set_resource(resource);
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {
self.0.set_resource(resource)
}
}
20 changes: 18 additions & 2 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ use std::fmt::Debug;
/// `LogExporter` defines the interface that log exporters should implement.
#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
/// Exports a batch of [`LogData`].
/// Exports a batch of readable logs. Protocol exporters that will
/// implement this function are typically expected to serialize and transmit
/// the data to the destination.
///
/// This function will never be called concurrently for the same exporter
/// instance. It can be called again only after the current call returns.
///
/// This function must not block indefinitely, there must be a reasonable
/// upper limit after which the call must time out with an error result.
///
/// Any retry logic that is required by the exporter is the responsibility
/// of the exporter.
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
/// Shuts down the exporter.
fn shutdown(&mut self) {}
Expand All @@ -24,7 +35,12 @@ pub trait LogExporter: Send + Sync + Debug {
true
}
/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
/// This function SHOULD only be called once during the initialization of the exporter.
/// This function SHOULD complete or abort within some timeout. This function SHOULD be
/// implemented as a blocking API
fn set_resource(&mut self, _resource: &Resource) -> LogResult<()> {
Ok(())
}

Check warning on line 43 in opentelemetry-sdk/src/export/logs/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/export/logs/mod.rs#L41-L43

Added lines #L41 - L43 were not covered by tests
}

/// `LogData` represents a single log event without resource context.
Expand Down
7 changes: 6 additions & 1 deletion opentelemetry-sdk/src/export/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ pub trait SpanExporter: Send + Sync + Debug {
}

/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
/// This function SHOULD only be called once during the initialization of the exporter.
/// This function SHOULD complete or abort within some timeout. This function SHOULD be
/// implemented as a blocking API
fn set_resource(&mut self, _resource: &Resource) -> ExportResult {
Ok(())
}
}

/// `SpanData` contains all the information collected by a `Span` and can be used
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ impl Builder {

// invoke set_resource on all the processors
for processor in logger_provider.log_processors() {
processor.set_resource(logger_provider.resource());
if let Err(err) = processor.set_resource(logger_provider.resource()) {
global::handle_error(err);

Check warning on line 209 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L209

Added line #L209 was not covered by tests
}
}
logger_provider
}
Expand Down
52 changes: 35 additions & 17 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ pub trait LogProcessor: Send + Sync + Debug {
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;

/// Set the resource for the log processor.
fn set_resource(&self, _resource: &Resource) {}
fn set_resource(&self, _resource: &Resource) -> LogResult<()> {
Ok(())
}
}

/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
Expand Down Expand Up @@ -125,9 +127,13 @@ impl LogProcessor for SimpleLogProcessor {
}
}

fn set_resource(&self, resource: &Resource) {
fn set_resource(&self, resource: &Resource) -> LogResult<()> {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
exporter.set_resource(resource)
} else {
Err(LogError::Other(
"simple logprocessor mutex poison during set_resource".into(),
))

Check warning on line 136 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L134-L136

Added lines #L134 - L136 were not covered by tests
}
}

Expand Down Expand Up @@ -189,11 +195,16 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
.and_then(std::convert::identity)
}

fn set_resource(&self, resource: &Resource) {
fn set_resource(&self, resource: &Resource) -> LogResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
self.message_sender
.try_send(BatchMessage::SetResource(resource, res_sender))
.map_err(|err| LogError::Other(err.into()))?;

futures_executor::block_on(res_receiver)
.map_err(|err| LogError::Other(err.into()))
.and_then(std::convert::identity)
}
}

Expand Down Expand Up @@ -275,8 +286,14 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}

// propagate the resource
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
BatchMessage::SetResource(resource, res_sender) => {
let result = exporter.set_resource(&resource);
if let Err(result) = res_sender.send(result) {
global::handle_error(LogError::from(format!(
"failed to send set resource result: {:?}",
result
)));

Check warning on line 295 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L292-L295

Added lines #L292 - L295 were not covered by tests
}
}
}
}
Expand Down Expand Up @@ -500,7 +517,7 @@ enum BatchMessage {
/// Shut down the worker thread, push all logs in buffer to the backend.
Shutdown(oneshot::Sender<ExportResult>),
/// Set the resource for the exporter.
SetResource(Arc<Resource>),
SetResource(Arc<Resource>, oneshot::Sender<LogResult<()>>),
}

#[cfg(all(test, feature = "testing", feature = "logs"))]
Expand All @@ -527,7 +544,7 @@ mod tests {
use opentelemetry::logs::AnyValue;
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::logs::{Logger, LoggerProvider as _};
use opentelemetry::logs::{LogError, Logger, LoggerProvider as _};
use opentelemetry::Key;
use opentelemetry::{logs::LogResult, KeyValue};
use std::borrow::Cow;
Expand All @@ -547,13 +564,14 @@ mod tests {

fn shutdown(&mut self) {}

fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_opt| {
fn set_resource(&mut self, resource: &Resource) -> LogResult<()> {
match self.resource.lock() {
Ok(mut res_opt) => {
res_opt.replace(resource.clone());
})
.expect("mock log exporter shouldn't error when setting resource");
Ok(())
}
Err(_) => Err(LogError::Other("mock log exporter mutex poison".into())),

Check warning on line 573 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L573

Added line #L573 was not covered by tests
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ impl LogExporter for InMemoryLogsExporter {
}
}

fn set_resource(&mut self, resource: &Resource) {
let mut res_guard = self.resource.lock().expect("Resource lock poisoned");
fn set_resource(&mut self, resource: &Resource) -> LogResult<()> {
let mut res_guard = self.resource.lock().map_err(LogError::from)?;
*res_guard = resource.clone();
Ok(())
}
}
9 changes: 4 additions & 5 deletions opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ impl SpanExporter for InMemorySpanExporter {
self.reset()
}

fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_guard| *res_guard = resource.clone())
.expect("Resource lock poisoned");
fn set_resource(&mut self, resource: &Resource) -> ExportResult {
let mut res_guard = self.resource.lock().map_err(TraceError::from)?;
*res_guard = resource.clone();
Ok(())
}
}
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ impl Builder {

// Set the resource for each processor
for p in &mut processors {
p.set_resource(config.resource.as_ref());
if let Err(err) = p.set_resource(config.resource.as_ref()) {
global::handle_error(err);

Check warning on line 293 in opentelemetry-sdk/src/trace/provider.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/provider.rs#L293

Added line #L293 was not covered by tests
}
}

TracerProvider::new(TracerProviderInner { processors, config })
Expand Down
36 changes: 25 additions & 11 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// Implementation should make sure shutdown can be called multiple times.
fn shutdown(&self) -> TraceResult<()>;
/// Set the resource for the log processor.
fn set_resource(&mut self, _resource: &Resource) {}
fn set_resource(&mut self, _resource: &Resource) -> TraceResult<()> {
Ok(())
}

Check warning on line 101 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L99-L101

Added lines #L99 - L101 were not covered by tests
}

/// A [SpanProcessor] that passes finished spans to the configured
Expand Down Expand Up @@ -153,9 +155,10 @@ impl SpanProcessor for SimpleSpanProcessor {
}
}

fn set_resource(&mut self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
fn set_resource(&mut self, resource: &Resource) -> ExportResult {
match self.exporter.lock() {
Ok(mut exporter) => exporter.set_resource(resource),
Err(_) => Err(TraceError::Other("SimpleSpanProcessor mutex poison".into())),

Check warning on line 161 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L161

Added line #L161 was not covered by tests
}
}
}
Expand Down Expand Up @@ -271,11 +274,16 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
.and_then(|identity| identity)
}

fn set_resource(&mut self, resource: &Resource) {
fn set_resource(&mut self, resource: &Resource) -> ExportResult {
let (res_sender, res_receiver) = oneshot::channel();
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
self.message_sender
.try_send(BatchMessage::SetResource(resource, res_sender))
.map_err(|err| TraceError::Other(err.into()))?;

futures_executor::block_on(res_receiver)
.map_err(|err| TraceError::Other(err.into()))
.and_then(|identity| identity)
}
}

Expand All @@ -294,7 +302,7 @@ enum BatchMessage {
/// Shut down the worker thread, push all spans in buffer to the backend.
Shutdown(oneshot::Sender<ExportResult>),
/// Set the resource for the exporter.
SetResource(Arc<Resource>),
SetResource(Arc<Resource>, oneshot::Sender<ExportResult>),
}

struct BatchSpanProcessorInternal<R> {
Expand Down Expand Up @@ -396,8 +404,14 @@ impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
return false;
}
// propagate the resource
BatchMessage::SetResource(resource) => {
self.exporter.set_resource(&resource);
BatchMessage::SetResource(resource, res_sender) => {
let result = self.exporter.set_resource(&resource);
if let Err(result) = res_sender.send(result) {
global::handle_error(TraceError::from(format!(
"failed to send set resource result: {:?}",
result
)));

Check warning on line 413 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L410-L413

Added lines #L410 - L413 were not covered by tests
}
}
}
true
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
self.writer.take();
}

fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) -> LogResult<()> {

Check warning on line 66 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L66

Added line #L66 was not covered by tests
self.resource = res.clone();
Ok(())

Check warning on line 68 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L68

Added line #L68 was not covered by tests
}
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-stdout/src/trace/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
self.writer.take();
}

fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) -> ExportResult {

Check warning on line 59 in opentelemetry-stdout/src/trace/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/exporter.rs#L59

Added line #L59 was not covered by tests
self.resource = res.clone();
Ok(())

Check warning on line 61 in opentelemetry-stdout/src/trace/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/exporter.rs#L61

Added line #L61 was not covered by tests
}
}

Expand Down

0 comments on commit 161f087

Please sign in to comment.