Skip to content

feat(dev): Enable internal_log_rate_limit by default #22899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/codecs/src/decoding/framing/chunked_gelf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ impl ChunkedGelfDecoder {
warn!(
message_id = message_id,
timeout_secs = timeout.as_secs_f64(),
internal_log_rate_limit = true,
"Message was not fully received within the timeout window. Discarding it."
);
}
Expand All @@ -409,7 +408,6 @@ impl ChunkedGelfDecoder {
debug!(
message_id = message_id,
sequence_number = sequence_number,
internal_log_rate_limit = true,
"Received a duplicate chunk. Ignoring it."
);
return Ok(None);
Expand Down
2 changes: 1 addition & 1 deletion lib/dnstap-parser/src/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl<E: std::fmt::Display> InternalEvent for DnstapParseWarning<E> {
error = %self.error,
stage = error_stage::PROCESSING,
error_type = error_type::PARSER_FAILED,
internal_log_rate_limit = true,

);
}
}
1 change: 0 additions & 1 deletion lib/tracing-limit/benches/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ fn bench(c: &mut Criterion) {
bar = "bar",
baz = 3,
quuux = ?0.99,
internal_log_rate_limit = true
)
}
})
Expand Down
6 changes: 1 addition & 5 deletions lib/tracing-limit/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ fn main() {
tracing::dispatcher::with_default(&dispatch, || {
for i in 0..40usize {
trace!("This field is not rate limited!");
info!(
message = "This message is rate limited",
count = &i,
internal_log_rate_limit = true,
);
info!(message = "This message is rate limited", count = &i,);
std::thread::sleep(std::time::Duration::from_millis(1000));
}
})
Expand Down
1 change: 0 additions & 1 deletion lib/tracing-limit/examples/by_span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ fn main() {
message =
"This message is rate limited by its component and vrl_line_number",
count = &i,
internal_log_rate_limit = true,
);
}
}
Expand Down
45 changes: 32 additions & 13 deletions lib/tracing-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
let mut limit_visitor = LimitVisitor::default();
event.record(&mut limit_visitor);

let limit_exists = limit_visitor.limit.unwrap_or(false);
let limit_exists = limit_visitor.limit.unwrap_or(true);
Copy link
Author

@shivanthzen shivanthzen Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the default is changed

if !limit_exists {
return self.inner.on_event(event, ctx);
}
Expand Down Expand Up @@ -264,11 +264,8 @@ where
let valueset = fields.value_set(&values);
let event = Event::new(metadata, &valueset);
self.inner.on_event(&event, ctx.clone());
} else {
let values = [(
&fields.field(RATE_LIMIT_FIELD).unwrap(),
Some(&rate_limit as &dyn Value),
)];
} else if let Some(ratelimit_field) = fields.field(RATE_LIMIT_FIELD) {
let values = [(&ratelimit_field, Some(&rate_limit as &dyn Value))];

let valueset = fields.value_set(&values);
let event = Event::new(metadata, &valueset);
Expand Down Expand Up @@ -525,6 +522,34 @@ mod test {
);
}

#[test]
fn rate_limits_default() {
let events: Arc<Mutex<Vec<String>>> = Default::default();

let recorder = RecordingLayer::new(Arc::clone(&events));
let sub = tracing_subscriber::registry::Registry::default()
.with(RateLimitedLayer::new(recorder).with_default_limit(10));
tracing::subscriber::with_default(sub, || {
for _ in 0..21 {
info!(message = "Hello world!");
MockClock::advance(Duration::from_millis(100));
}
});

let events = events.lock().unwrap();

assert_eq!(
*events,
vec![
"Hello world!",
"Internal log [Hello world!] is being suppressed to avoid flooding.",
]
.into_iter()
.map(std::borrow::ToOwned::to_owned)
.collect::<Vec<String>>()
);
}

#[test]
fn override_rate_limit_at_callsite() {
let events: Arc<Mutex<Vec<String>>> = Default::default();
Expand All @@ -534,11 +559,7 @@ mod test {
.with(RateLimitedLayer::new(recorder).with_default_limit(100));
tracing::subscriber::with_default(sub, || {
for _ in 0..21 {
info!(
message = "Hello world!",
internal_log_rate_limit = true,
internal_log_rate_secs = 1
);
info!(message = "Hello world!", internal_log_rate_secs = 1);
MockClock::advance(Duration::from_millis(100));
}
});
Expand Down Expand Up @@ -579,7 +600,6 @@ mod test {
info!(
message =
format!("Hello {} on line_number {}!", key, line_number).as_str(),
internal_log_rate_limit = true
);
}
}
Expand Down Expand Up @@ -641,7 +661,6 @@ mod test {
info!(
message =
format!("Hello {} on line_number {}!", key, line_number).as_str(),
internal_log_rate_limit = true,
component_id = &key,
vrl_position = &line_number
);
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-buffers/src/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl InternalEvent for BufferReadError {
error_code = self.error_code,
error_type = error_type::READER_FAILED,
stage = "processing",
internal_log_rate_limit = true,

);
counter!(
"buffer_errors_total", "error_code" => self.error_code,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,13 @@ impl<const INTENDED: bool> InternalEventHandle for DroppedHandle<'_, INTENDED> {
intentional = INTENDED,
count = data.0,
reason = self.reason,
internal_log_rate_limit = true,
);
} else {
error!(
message,
intentional = INTENDED,
count = data.0,
reason = self.reason,
internal_log_rate_limit = true,
);
}
self.discarded_events.increment(data.0 as u64);
Expand Down
3 changes: 1 addition & 2 deletions lib/vector-common/src/internal_event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ impl<E: std::fmt::Debug> InternalEvent for PollReadyError<E> {
error = ?self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -45,7 +44,7 @@ impl<E: std::fmt::Debug> InternalEvent for CallError<E> {
request_id = self.request_id,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down
5 changes: 1 addition & 4 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,10 +585,7 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64)
};

trace::init(color, json, &level, rate);
debug!(
message = "Internal log rate limit configured.",
internal_log_rate_secs = rate,
);
debug!(message = "Internal log rate limit configured.",);
info!(message = "Log level is enabled.", level = ?level);
}

Expand Down
1 change: 1 addition & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub struct RootOpts {
pub watch_config_poll_interval_seconds: NonZeroU64,

/// Set the internal log rate limit
/// Note that traces are throttled by default unless tagged with `internal_log_rate_limit = false`.
#[arg(
short,
long,
Expand Down
6 changes: 3 additions & 3 deletions src/internal_events/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub mod source {
error = ?self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::RECEIVING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand All @@ -58,7 +58,7 @@ pub mod source {
error = ?self.error,
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::RECEIVING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand All @@ -80,7 +80,7 @@ pub mod source {
error = ?self.error,
error_type = error_type::COMMAND_FAILED,
stage = error_stage::RECEIVING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/apache_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl InternalEvent for ApacheMetricsParseError<'_> {
stage = error_stage::PROCESSING,
error_type = error_type::PARSER_FAILED,
endpoint = %self.endpoint,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down
1 change: 0 additions & 1 deletion src/internal_events/aws_cloudwatch_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ impl InternalEvent for AwsCloudwatchLogsMessageSizeError {
error_code = "message_too_long",
error_type = error_type::ENCODER_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/aws_ec2_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl InternalEvent for AwsEc2MetadataRefreshError {
error = %self.error,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down
4 changes: 2 additions & 2 deletions src/internal_events/aws_ecs_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ impl InternalEvent for AwsEcsMetricsParseError<'_> {
error = ?self.error,
stage = error_stage::PROCESSING,
error_type = error_type::PARSER_FAILED,
internal_log_rate_limit = true,

);
debug!(
message = %format!("Failed to parse response:\\n\\n{}\\n\\n", self.body.escape_debug()),
endpoint = %self.endpoint,
internal_log_rate_limit = true,

);
counter!("parse_errors_total").increment(1);
counter!(
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/aws_kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl InternalEvent for AwsKinesisStreamNoPartitionKeyError<'_> {
partition_key_field = %self.partition_key_field,
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,

);

counter!(
Expand Down
3 changes: 1 addition & 2 deletions src/internal_events/aws_kinesis_firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ impl InternalEvent for AwsKinesisFirehoseRequestError<'_> {
error_type = error_type::REQUEST_FAILED,
error_code = %self.error_code,
request_id = %self.request_id.unwrap_or(""),
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
Expand All @@ -75,7 +74,7 @@ impl InternalEvent for AwsKinesisFirehoseAutomaticRecordDecodeError {
error_type = error_type::PARSER_FAILED,
error_code = %io_error_code(&self.error),
compression = %self.compression,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down
10 changes: 5 additions & 5 deletions src/internal_events/aws_sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod s3 {
error_code = "failed_processing_sqs_message",
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -73,7 +73,7 @@ mod s3 {
error_code = "failed_deleting_some_sqs_messages",
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -103,7 +103,7 @@ mod s3 {
error_code = "failed_deleting_all_sqs_messages",
error_type = error_type::ACKNOWLEDGMENT_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand All @@ -129,7 +129,7 @@ impl<E: std::fmt::Display> InternalEvent for SqsMessageReceiveError<'_, E> {
error_code = "failed_fetching_sqs_events",
error_type = error_type::REQUEST_FAILED,
stage = error_stage::RECEIVING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down Expand Up @@ -182,7 +182,7 @@ impl<E: std::fmt::Display> InternalEvent for SqsMessageDeleteError<'_, E> {
error = %self.error,
error_type = error_type::WRITER_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl InternalEvent for LargeEventDroppedError {
length = %self.length,
error_type = error_type::CONDITION_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down
10 changes: 5 additions & 5 deletions src/internal_events/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl<E: std::fmt::Display> InternalEvent for DecoderFramingError<E> {
error_code = "decoder_frame",
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand All @@ -40,7 +40,7 @@ impl InternalEvent for DecoderDeserializeError<'_> {
error_code = "decoder_deserialize",
error_type = error_type::PARSER_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand All @@ -66,7 +66,7 @@ impl InternalEvent for EncoderFramingError<'_> {
error_code = "encoder_frame",
error_type = error_type::ENCODER_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand All @@ -93,7 +93,7 @@ impl InternalEvent for EncoderSerializeError<'_> {
error_code = "encoder_serialize",
error_type = error_type::ENCODER_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand All @@ -120,7 +120,7 @@ impl<E: std::fmt::Display> InternalEvent for EncoderWriteError<'_, E> {
error = %self.error,
error_type = error_type::IO_FAILED,
stage = error_stage::SENDING,
internal_log_rate_limit = true,

);
counter!(
"component_errors_total",
Expand Down
Loading
Loading