-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(opentelemetry source)!: Improve HTTP error signalling #22957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
fbs
wants to merge
3
commits into
vectordotdev:master
Choose a base branch
from
fbs:22940_part1
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ | |
use http::StatusCode; | ||
use hyper::{service::make_service_fn, Server}; | ||
use prost::Message; | ||
use snafu::Snafu; | ||
use tokio::net::TcpStream; | ||
use tower::ServiceBuilder; | ||
use tracing::Span; | ||
|
@@ -45,25 +44,40 @@ | |
use super::OpentelemetryConfig; | ||
use super::{reply::protobuf, status::Status}; | ||
|
||
#[derive(Clone, Copy, Debug, Snafu)] | ||
pub(crate) enum ApiError { | ||
ServerShutdown, | ||
#[derive(Clone, Copy, Debug)] | ||
pub(crate) enum ContentType { | ||
Protobuf, | ||
} | ||
|
||
impl warp::reject::Reject for ApiError {} | ||
#[derive(Debug)] | ||
struct InvalidContentType; | ||
|
||
impl warp::reject::Reject for InvalidContentType {} | ||
|
||
fn extract_content_type() -> impl warp::Filter<Extract = (ContentType,), Error = Rejection> + Copy { | ||
warp::header::<String>(http::header::CONTENT_TYPE.as_str()).and_then( | ||
|content_type: String| async move { | ||
match content_type.as_str() { | ||
"application/x-protobuf" => Ok(ContentType::Protobuf), | ||
_ => Err(warp::reject::custom(InvalidContentType)), | ||
} | ||
}, | ||
) | ||
} | ||
|
||
pub(crate) async fn run_http_server( | ||
address: SocketAddr, | ||
tls_settings: MaybeTlsSettings, | ||
filters: BoxedFilter<(Response,)>, | ||
routes: BoxedFilter<(Response,)>, | ||
shutdown: ShutdownSignal, | ||
keepalive_settings: KeepaliveConfig, | ||
) -> crate::Result<()> { | ||
let listener = tls_settings.bind(&address).await?; | ||
let routes = filters.recover(handle_rejection); | ||
|
||
info!(message = "Building HTTP server.", address = %address); | ||
|
||
let routes = routes.recover(handle_rejection); | ||
|
||
let span = Span::current(); | ||
let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| { | ||
let svc = ServiceBuilder::new() | ||
|
@@ -115,11 +129,41 @@ | |
bytes_received, | ||
events_received, | ||
); | ||
|
||
// The OTLP spec says that HTTP 4xx errors must include a grpc Status message | ||
// by doing it here we get access to the content-type header, which is required | ||
// to differentiate between protobuf and json encoding | ||
let handle_errors: BoxedFilter<(Response,)> = warp::any() | ||
.and(warp::method()) | ||
.and(warp::path::peek()) | ||
.and(extract_content_type()) | ||
.then(|method, path: warp::filters::path::Peek, _ct| async move { | ||
if method != hyper::Method::POST { | ||
let reply = protobuf(Status { | ||
code: 2, | ||
message: "method not allowed, supported: [POST]".into(), | ||
..Default::default() | ||
}); | ||
warp::reply::with_status(reply, hyper::StatusCode::METHOD_NOT_ALLOWED) | ||
.into_response() | ||
} else { | ||
let reply = protobuf(Status { | ||
code: 2, | ||
message: format!("unknown route: {}", path.as_str()), | ||
..Default::default() | ||
}); | ||
warp::reply::with_status(reply, hyper::StatusCode::NOT_FOUND).into_response() | ||
} | ||
}) | ||
.boxed(); | ||
|
||
log_filters | ||
.or(trace_filters) | ||
.unify() | ||
.or(metrics_filters) | ||
.unify() | ||
.or(handle_errors) | ||
.unify() | ||
.boxed() | ||
} | ||
|
||
|
@@ -148,15 +192,12 @@ | |
) -> BoxedFilter<(Response,)> { | ||
warp::post() | ||
.and(warp::path!("v1" / "logs")) | ||
.and(warp::header::exact_ignore_case( | ||
"content-type", | ||
"application/x-protobuf", | ||
)) | ||
.and(extract_content_type()) | ||
.and(warp::header::optional::<String>("content-encoding")) | ||
.and(warp::header::headers_cloned()) | ||
.and(warp::body::bytes()) | ||
.and_then( | ||
move |encoding_header: Option<String>, headers_config: HeaderMap, body: Bytes| { | ||
move |_, encoding_header: Option<String>, headers_config: HeaderMap, body: Bytes| { | ||
let events = decode(encoding_header.as_deref(), body) | ||
.and_then(|body| { | ||
bytes_received.emit(ByteSize(body.len())); | ||
|
@@ -187,13 +228,10 @@ | |
) -> BoxedFilter<(Response,)> { | ||
warp::post() | ||
.and(warp::path!("v1" / "metrics")) | ||
.and(warp::header::exact_ignore_case( | ||
"content-type", | ||
"application/x-protobuf", | ||
)) | ||
.and(extract_content_type()) | ||
.and(warp::header::optional::<String>("content-encoding")) | ||
.and(warp::body::bytes()) | ||
.and_then(move |encoding_header: Option<String>, body: Bytes| { | ||
.and_then(move |_, encoding_header: Option<String>, body: Bytes| { | ||
let events = decode(encoding_header.as_deref(), body).and_then(|body| { | ||
bytes_received.emit(ByteSize(body.len())); | ||
decode_metrics_body(body, &events_received) | ||
|
@@ -218,13 +256,10 @@ | |
) -> BoxedFilter<(Response,)> { | ||
warp::post() | ||
.and(warp::path!("v1" / "traces")) | ||
.and(warp::header::exact_ignore_case( | ||
"content-type", | ||
"application/x-protobuf", | ||
)) | ||
.and(extract_content_type()) | ||
.and(warp::header::optional::<String>("content-encoding")) | ||
.and(warp::body::bytes()) | ||
.and_then(move |encoding_header: Option<String>, body: Bytes| { | ||
.and_then(move |_, encoding_header: Option<String>, body: Bytes| { | ||
let events = decode(encoding_header.as_deref(), body).and_then(|body| { | ||
bytes_received.emit(ByteSize(body.len())); | ||
decode_trace_body(body, &events_received) | ||
|
@@ -324,56 +359,74 @@ | |
output: &str, | ||
resp: impl Message, | ||
) -> Result<Response, Rejection> { | ||
match events { | ||
Ok(mut events) => { | ||
let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events); | ||
let count = events.len(); | ||
|
||
out.send_batch_named(output, events).await.map_err(|_| { | ||
emit!(StreamClosedError { count }); | ||
warp::reject::custom(ApiError::ServerShutdown) | ||
})?; | ||
|
||
match receiver { | ||
None => Ok(protobuf(resp).into_response()), | ||
Some(receiver) => match receiver.await { | ||
BatchStatus::Delivered => Ok(protobuf(resp).into_response()), | ||
BatchStatus::Errored => Err(warp::reject::custom(Status { | ||
code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here | ||
message: "Error delivering contents to sink".into(), | ||
..Default::default() | ||
})), | ||
BatchStatus::Rejected => Err(warp::reject::custom(Status { | ||
code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here | ||
message: "Contents failed to deliver to sink".into(), | ||
..Default::default() | ||
})), | ||
}, | ||
} | ||
} | ||
Err(err) => Err(warp::reject::custom(err)), | ||
let reply_with_status = | ||
|http_code: hyper::StatusCode, code: i32, message: String| -> Result<Response, Rejection> { | ||
let mut resp = protobuf(Status { | ||
code, | ||
message, | ||
..Default::default() | ||
}) | ||
.into_response(); | ||
*resp.status_mut() = http_code; | ||
Ok(resp) | ||
}; | ||
|
||
if let Err(err) = events { | ||
return reply_with_status( | ||
err.status_code(), | ||
2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here | ||
err.message().into(), | ||
); | ||
} | ||
} | ||
|
||
async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> { | ||
if let Some(err_msg) = err.find::<ErrorMessage>() { | ||
let reply = protobuf(Status { | ||
code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here | ||
message: err_msg.message().into(), | ||
..Default::default() | ||
}); | ||
let mut events = events.unwrap(); | ||
let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events); | ||
let count = events.len(); | ||
|
||
if let Err(_) = out.send_batch_named(output, events).await { | ||
emit!(StreamClosedError { count }); | ||
// the client can try again later | ||
return reply_with_status( | ||
hyper::StatusCode::SERVICE_UNAVAILABLE, | ||
2, | ||
"Server is shutting down".into(), | ||
); | ||
}; | ||
|
||
match receiver { | ||
None => Ok(protobuf(resp).into_response()), | ||
Some(receiver) => match receiver.await { | ||
BatchStatus::Delivered => Ok(protobuf(resp).into_response()), | ||
BatchStatus::Errored => reply_with_status( | ||
hyper::StatusCode::INTERNAL_SERVER_ERROR, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to tell why it errored and whether it makes sense to return a 'retriable' status code (e.g. 502) |
||
2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here | ||
"Error delivering contents to sink".into(), | ||
), | ||
BatchStatus::Rejected => reply_with_status( | ||
hyper::StatusCode::INTERNAL_SERVER_ERROR, | ||
2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here | ||
"Contents failed to deliver to sink".into(), | ||
), | ||
}, | ||
} | ||
} | ||
|
||
Ok(warp::reply::with_status(reply, err_msg.status_code())) | ||
async fn handle_rejection(err: Rejection) -> Result<Response, Infallible> { | ||
let reply = if let Some(_) = err.find::<InvalidContentType>() { | ||
warp::reply::with_status( | ||
hyper::StatusCode::UNSUPPORTED_MEDIA_TYPE.as_str(), | ||
hyper::StatusCode::UNSUPPORTED_MEDIA_TYPE, | ||
) | ||
} else { | ||
let reply = protobuf(Status { | ||
code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here | ||
message: format!("{:?}", err), | ||
..Default::default() | ||
}); | ||
|
||
Ok(warp::reply::with_status( | ||
reply, | ||
StatusCode::INTERNAL_SERVER_ERROR, | ||
)) | ||
} | ||
warn!("Unhandled rejection: {:?}", err); | ||
warp::reply::with_status( | ||
"Internal server error".into(), | ||
hyper::StatusCode::INTERNAL_SERVER_ERROR, | ||
) | ||
}; | ||
|
||
let mut resp = reply.into_response(); | ||
resp.headers_mut() | ||
.insert(http::header::CONTENT_TYPE, "text/plain".parse().unwrap()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should avoid |
||
Ok(resp) | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid
unwrap()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Is there a nice syntax trick? The
if let Err()
above should guarantee this unwrap is safeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, you can use
is_err()
and return early. Or: