Skip to content

feat(opentelemetry source)!: Improve HTTP error signalling #22957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 123 additions & 70 deletions src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use http::StatusCode;
use hyper::{service::make_service_fn, Server};
use prost::Message;
use snafu::Snafu;
use tokio::net::TcpStream;
use tower::ServiceBuilder;
use tracing::Span;
Expand Down Expand Up @@ -45,25 +44,40 @@
use super::OpentelemetryConfig;
use super::{reply::protobuf, status::Status};

#[derive(Clone, Copy, Debug, Snafu)]
pub(crate) enum ApiError {
ServerShutdown,
#[derive(Clone, Copy, Debug)]
pub(crate) enum ContentType {
Protobuf,
}

impl warp::reject::Reject for ApiError {}
#[derive(Debug)]
struct InvalidContentType;

impl warp::reject::Reject for InvalidContentType {}

fn extract_content_type() -> impl warp::Filter<Extract = (ContentType,), Error = Rejection> + Copy {
warp::header::<String>(http::header::CONTENT_TYPE.as_str()).and_then(
|content_type: String| async move {
match content_type.as_str() {
"application/x-protobuf" => Ok(ContentType::Protobuf),
_ => Err(warp::reject::custom(InvalidContentType)),
}
},
)
}

pub(crate) async fn run_http_server(
address: SocketAddr,
tls_settings: MaybeTlsSettings,
filters: BoxedFilter<(Response,)>,
routes: BoxedFilter<(Response,)>,
shutdown: ShutdownSignal,
keepalive_settings: KeepaliveConfig,
) -> crate::Result<()> {
let listener = tls_settings.bind(&address).await?;
let routes = filters.recover(handle_rejection);

info!(message = "Building HTTP server.", address = %address);

let routes = routes.recover(handle_rejection);

let span = Span::current();
let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
let svc = ServiceBuilder::new()
Expand Down Expand Up @@ -115,11 +129,41 @@
bytes_received,
events_received,
);

// The OTLP spec says that HTTP 4xx errors must include a grpc Status message
// by doing it here we get access to the content-type header, which is required
// to differentiate between protobuf and json encoding
let handle_errors: BoxedFilter<(Response,)> = warp::any()
.and(warp::method())
.and(warp::path::peek())
.and(extract_content_type())
.then(|method, path: warp::filters::path::Peek, _ct| async move {
if method != hyper::Method::POST {
let reply = protobuf(Status {
code: 2,
message: "method not allowed, supported: [POST]".into(),
..Default::default()
});
warp::reply::with_status(reply, hyper::StatusCode::METHOD_NOT_ALLOWED)
.into_response()
} else {
let reply = protobuf(Status {
code: 2,
message: format!("unknown route: {}", path.as_str()),
..Default::default()
});
warp::reply::with_status(reply, hyper::StatusCode::NOT_FOUND).into_response()
}
})
.boxed();

log_filters
.or(trace_filters)
.unify()
.or(metrics_filters)
.unify()
.or(handle_errors)
.unify()
.boxed()
}

Expand Down Expand Up @@ -148,15 +192,12 @@
) -> BoxedFilter<(Response,)> {
warp::post()
.and(warp::path!("v1" / "logs"))
.and(warp::header::exact_ignore_case(
"content-type",
"application/x-protobuf",
))
.and(extract_content_type())
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.and_then(
move |encoding_header: Option<String>, headers_config: HeaderMap, body: Bytes| {
move |_, encoding_header: Option<String>, headers_config: HeaderMap, body: Bytes| {
let events = decode(encoding_header.as_deref(), body)
.and_then(|body| {
bytes_received.emit(ByteSize(body.len()));
Expand Down Expand Up @@ -187,13 +228,10 @@
) -> BoxedFilter<(Response,)> {
warp::post()
.and(warp::path!("v1" / "metrics"))
.and(warp::header::exact_ignore_case(
"content-type",
"application/x-protobuf",
))
.and(extract_content_type())
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::body::bytes())
.and_then(move |encoding_header: Option<String>, body: Bytes| {
.and_then(move |_, encoding_header: Option<String>, body: Bytes| {
let events = decode(encoding_header.as_deref(), body).and_then(|body| {
bytes_received.emit(ByteSize(body.len()));
decode_metrics_body(body, &events_received)
Expand All @@ -218,13 +256,10 @@
) -> BoxedFilter<(Response,)> {
warp::post()
.and(warp::path!("v1" / "traces"))
.and(warp::header::exact_ignore_case(
"content-type",
"application/x-protobuf",
))
.and(extract_content_type())
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::body::bytes())
.and_then(move |encoding_header: Option<String>, body: Bytes| {
.and_then(move |_, encoding_header: Option<String>, body: Bytes| {
let events = decode(encoding_header.as_deref(), body).and_then(|body| {
bytes_received.emit(ByteSize(body.len()));
decode_trace_body(body, &events_received)
Expand Down Expand Up @@ -324,56 +359,74 @@
output: &str,
resp: impl Message,
) -> Result<Response, Rejection> {
match events {
Ok(mut events) => {
let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
let count = events.len();

out.send_batch_named(output, events).await.map_err(|_| {
emit!(StreamClosedError { count });
warp::reject::custom(ApiError::ServerShutdown)
})?;

match receiver {
None => Ok(protobuf(resp).into_response()),
Some(receiver) => match receiver.await {
BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
BatchStatus::Errored => Err(warp::reject::custom(Status {
code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
message: "Error delivering contents to sink".into(),
..Default::default()
})),
BatchStatus::Rejected => Err(warp::reject::custom(Status {
code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
message: "Contents failed to deliver to sink".into(),
..Default::default()
})),
},
}
}
Err(err) => Err(warp::reject::custom(err)),
let reply_with_status =
|http_code: hyper::StatusCode, code: i32, message: String| -> Result<Response, Rejection> {
let mut resp = protobuf(Status {
code,
message,
..Default::default()
})
.into_response();
*resp.status_mut() = http_code;
Ok(resp)
};

if let Err(err) = events {
return reply_with_status(
err.status_code(),
2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
err.message().into(),
);
}
}

async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
if let Some(err_msg) = err.find::<ErrorMessage>() {
let reply = protobuf(Status {
code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
message: err_msg.message().into(),
..Default::default()
});
let mut events = events.unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid unwrap()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Is there a nice syntax trick? The if let Err() above should guarantee this unwrap is safe

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, you can use is_err() and return early. Or:

let mut events = match events {
    Ok(events) => ...,
    Err(err) => { ... }
}

let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
let count = events.len();

if let Err(_) = out.send_batch_named(output, events).await {

Check failure on line 386 in src/sources/opentelemetry/http.rs

View workflow job for this annotation

GitHub Actions / Checks

redundant pattern matching, consider using `is_err()`
emit!(StreamClosedError { count });
// the client can try again later
return reply_with_status(
hyper::StatusCode::SERVICE_UNAVAILABLE,
2,
"Server is shutting down".into(),
);
};

match receiver {
None => Ok(protobuf(resp).into_response()),
Some(receiver) => match receiver.await {
BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
BatchStatus::Errored => reply_with_status(
hyper::StatusCode::INTERNAL_SERVER_ERROR,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to tell why it errored and whether it makes sense to return a 'retriable' status code (e.g. 502)

2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
"Error delivering contents to sink".into(),
),
BatchStatus::Rejected => reply_with_status(
hyper::StatusCode::INTERNAL_SERVER_ERROR,
2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
"Contents failed to deliver to sink".into(),
),
},
}
}

Ok(warp::reply::with_status(reply, err_msg.status_code()))
async fn handle_rejection(err: Rejection) -> Result<Response, Infallible> {
let reply = if let Some(_) = err.find::<InvalidContentType>() {

Check failure on line 415 in src/sources/opentelemetry/http.rs

View workflow job for this annotation

GitHub Actions / Checks

redundant pattern matching, consider using `is_some()`
warp::reply::with_status(
hyper::StatusCode::UNSUPPORTED_MEDIA_TYPE.as_str(),
hyper::StatusCode::UNSUPPORTED_MEDIA_TYPE,
)
} else {
let reply = protobuf(Status {
code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
message: format!("{:?}", err),
..Default::default()
});

Ok(warp::reply::with_status(
reply,
StatusCode::INTERNAL_SERVER_ERROR,
))
}
warn!("Unhandled rejection: {:?}", err);
warp::reply::with_status(
"Internal server error".into(),

Check failure on line 423 in src/sources/opentelemetry/http.rs

View workflow job for this annotation

GitHub Actions / Checks

useless conversion to the same type: `&str`
hyper::StatusCode::INTERNAL_SERVER_ERROR,
)
};

let mut resp = reply.into_response();
resp.headers_mut()
.insert(http::header::CONTENT_TYPE, "text/plain".parse().unwrap());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid unwrap

Ok(resp)
}
Loading
Loading