Skip to content

Commit 7618c56

Browse files
committed
dogstatsd: send length_prefix only for Unix Stream
Length prefix is only needed in case of Stream and not for Datagram whether it Unix or UDP sockets. Reference: https://github.com/DataDog/datadog-go/blob/bae3560e5c664d64b71c0e8ca89326afa362e12a/statsd/uds.go#L62 Earlier, it was assumed that RemoteAddr::Unixgram has to also prefix length which made the code complex to pad sent buffer with length, to avoid copy before sending. Since length prefix is no longer needed, make the code simple by pushing the length padding role until the end of send.
1 parent b33e7a0 commit 7618c56

File tree

3 files changed

+21
-94
lines changed

3 files changed

+21
-94
lines changed

metrics-exporter-dogstatsd/src/forwarder/mod.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,6 @@ pub(crate) struct ForwarderConfiguration {
110110
pub write_timeout: Duration,
111111
}
112112

113-
impl ForwarderConfiguration {
114-
/// Returns `true` if the remote address requires a length prefix to be sent before each payload.
115-
pub fn is_length_prefixed(&self) -> bool {
116-
match self.remote_addr {
117-
RemoteAddr::Udp(_) => false,
118-
#[cfg(target_os = "linux")]
119-
RemoteAddr::Unix(_) => true,
120-
#[cfg(target_os = "linux")]
121-
RemoteAddr::Unixgram(_) => true,
122-
}
123-
}
124-
}
125-
126113
#[cfg(test)]
127114
mod tests {
128115
use std::net::SocketAddrV4;

metrics-exporter-dogstatsd/src/forwarder/sync.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,18 @@ impl Client {
6060
Client::Unixgram(socket) => socket.send(buf),
6161

6262
#[cfg(target_os = "linux")]
63-
Client::Unix(socket) => match socket.write_all(buf) {
64-
Ok(()) => Ok(buf.len()),
65-
Err(e) => Err(e),
63+
Client::Unix(socket) => {
64+
match u32::try_from(buf.len()) {
65+
Ok(len) => socket.write_all(&len.to_be_bytes())?,
66+
Err(e) => {
67+
use std::io::{Error, ErrorKind};
68+
return Err(Error::new(ErrorKind::InvalidData, e));
69+
}
70+
}
71+
72+
socket.write_all(buf).map(|()| buf.len())
6673
},
74+
6775
}
6876
}
6977
}
@@ -143,7 +151,7 @@ impl Forwarder {
143151
pub fn run(mut self) {
144152
let mut flush_state = FlushState::default();
145153
let mut writer =
146-
PayloadWriter::new(self.config.max_payload_len, self.config.is_length_prefixed());
154+
PayloadWriter::new(self.config.max_payload_len);
147155
let mut telemetry_update = TelemetryUpdate::default();
148156

149157
let mut next_flush = Instant::now() + self.config.flush_interval;

metrics-exporter-dogstatsd/src/writer.rs

Lines changed: 9 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -46,29 +46,24 @@ pub(super) struct PayloadWriter {
4646
buf: Vec<u8>,
4747
trailer_buf: Vec<u8>,
4848
offsets: Vec<usize>,
49-
with_length_prefix: bool,
5049
}
5150

5251
impl PayloadWriter {
5352
/// Creates a new `PayloadWriter` with the given maximum payload length.
54-
pub fn new(max_payload_len: usize, with_length_prefix: bool) -> Self {
53+
pub fn new(max_payload_len: usize) -> Self {
5554
// NOTE: This should also be handled in the builder, but we want to just double check here that we're getting a
5655
// properly sanitized value.
5756
assert!(
5857
u32::try_from(max_payload_len).is_ok(),
5958
"maximum payload length must be less than 2^32 bytes"
6059
);
6160

62-
let mut writer = Self {
61+
Self {
6362
max_payload_len,
6463
buf: Vec::new(),
6564
trailer_buf: Vec::new(),
6665
offsets: Vec::new(),
67-
with_length_prefix,
68-
};
69-
70-
writer.prepare_for_write();
71-
writer
66+
}
7267
}
7368

7469
fn last_offset(&self) -> usize {
@@ -80,21 +75,10 @@ impl PayloadWriter {
8075
//
8176
// If there aren't any committed metrics, then the last offset is simply zero.
8277
let last_offset = self.last_offset();
83-
let maybe_length_prefix_len = if self.with_length_prefix { 4 } else { 0 };
84-
self.buf.len() - last_offset - maybe_length_prefix_len
85-
}
86-
87-
fn prepare_for_write(&mut self) {
88-
if self.with_length_prefix {
89-
// If we're adding length prefixes, we need to write the length of the payload first.
90-
//
91-
// We write a dummy length of zero for now, and then we'll go back and fill it in later.
92-
self.buf.extend_from_slice(&[0, 0, 0, 0]);
93-
}
78+
self.buf.len() - last_offset
9479
}
9580

9681
fn commit(&mut self) -> bool {
97-
let current_last_offset = self.last_offset();
9882
let current_len = self.current_len();
9983
if current_len > self.max_payload_len {
10084
// If the current metric is too long, we need to truncate everything we just wrote to get us back to the end
@@ -107,19 +91,6 @@ impl PayloadWriter {
10791
// Track the new offset.
10892
self.offsets.push(self.buf.len());
10993

110-
// If we're dealing with length-delimited payloads, go back to the beginning of this payload and fill in the
111-
// length of it.
112-
if self.with_length_prefix {
113-
// NOTE: We unwrap the conversion here because we know that `self.max_payload_len` is less than 2^32, and we
114-
// check above that `current_len` is less than or equal to `self.max_payload_len`.
115-
let current_len_buf = u32::try_from(current_len).unwrap().to_le_bytes();
116-
self.buf[current_last_offset..current_last_offset + 4]
117-
.copy_from_slice(&current_len_buf[..]);
118-
}
119-
120-
// Initialize the buffer for the next payload.
121-
self.prepare_for_write();
122-
12394
true
12495
}
12596

@@ -542,7 +513,7 @@ mod tests {
542513
];
543514

544515
for (key, value, ts, prefix, global_labels, expected) in cases {
545-
let mut writer = PayloadWriter::new(8192, false);
516+
let mut writer = PayloadWriter::new(8192);
546517
let result = writer.write_counter(&key, value, ts, prefix, global_labels);
547518
assert_eq!(result.payloads_written(), 1);
548519

@@ -607,7 +578,7 @@ mod tests {
607578
];
608579

609580
for (key, value, ts, prefix, global_labels, expected) in cases {
610-
let mut writer = PayloadWriter::new(8192, false);
581+
let mut writer = PayloadWriter::new(8192);
611582
let result = writer.write_gauge(&key, value, ts, prefix, global_labels);
612583
assert_eq!(result.payloads_written(), 1);
613584

@@ -666,7 +637,7 @@ mod tests {
666637
];
667638

668639
for (key, values, prefix, global_labels, expected) in cases {
669-
let mut writer = PayloadWriter::new(8192, false);
640+
let mut writer = PayloadWriter::new(8192);
670641
let result =
671642
writer.write_histogram(&key, values.iter().copied(), None, prefix, global_labels);
672643
assert_eq!(result.payloads_written(), 1);
@@ -726,7 +697,7 @@ mod tests {
726697
];
727698

728699
for (key, values, prefix, global_labels, expected) in cases {
729-
let mut writer = PayloadWriter::new(8192, false);
700+
let mut writer = PayloadWriter::new(8192);
730701
let result = writer.write_distribution(
731702
&key,
732703
values.iter().copied(),
@@ -741,51 +712,12 @@ mod tests {
741712
}
742713
}
743714

744-
#[test]
745-
fn length_prefix() {
746-
let prefixed = |buf: &str| {
747-
let mut prefixed_buf = Vec::with_capacity(buf.len() + 4);
748-
prefixed_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes());
749-
prefixed_buf.extend_from_slice(buf.as_bytes());
750-
prefixed_buf
751-
};
752-
753-
// Cases are defined as: metric key, metric values, metric timestamp, expected output.
754-
let cases = [
755-
(Key::from("test_distribution"), &[22.22][..], prefixed("test_distribution:22.22|d\n")),
756-
(
757-
Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]),
758-
&[88.0][..],
759-
prefixed("test_distribution:88.0|d|#foo:bar,baz:quux\n"),
760-
),
761-
(
762-
Key::from("test_distribution"),
763-
&[22.22, 33.33, 44.44][..],
764-
prefixed("test_distribution:22.22:33.33:44.44|d\n"),
765-
),
766-
(
767-
Key::from_parts("test_distribution", &[("foo", "bar"), ("baz", "quux")]),
768-
&[88.0, 66.6, 123.4][..],
769-
prefixed("test_distribution:88.0:66.6:123.4|d|#foo:bar,baz:quux\n"),
770-
),
771-
];
772-
773-
for (key, values, expected) in cases {
774-
let mut writer = PayloadWriter::new(8192, true);
775-
let result = writer.write_distribution(&key, values.iter().copied(), None, None, &[]);
776-
assert_eq!(result.payloads_written(), 1);
777-
778-
let actual = buf_from_writer(&mut writer);
779-
assert_eq!(actual, expected);
780-
}
781-
}
782-
783715
proptest! {
784716
#[test]
785717
fn property_test_gauntlet(payload_limit in 0..16384usize, inputs in arb_vec(arb_metric(), 1..128)) {
786718
// TODO: Parameterize reservoir size so we can exercise the sample rate stuff.[]
787719

788-
let mut writer = PayloadWriter::new(payload_limit, false);
720+
let mut writer = PayloadWriter::new(payload_limit);
789721
let mut total_input_points: u64 = 0;
790722
let mut payloads_written = 0;
791723
let mut points_dropped = 0;

0 commit comments

Comments
 (0)