Skip to content

Commit 41f1854

Browse files
committed
AMLII-2166 - Add UDS Streams support to the DogStatsD client
Includes full support for the unix://, unixstream://, and unixgram:// prefixes utilized by DD_DOGSTATSD_URL in prepration to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw path.
1 parent ab20e29 commit 41f1854

File tree

3 files changed

+87
-10
lines changed

3 files changed

+87
-10
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ pip-log.txt
3838
.project
3939
.pydevproject
4040

41+
# Integration test constructs
42+
temp_sockets
43+
4144
#Misc
4245
.DS_Store
4346
.eggs/

datadog/dogstatsd/base.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
DEFAULT_HOST = "localhost"
5050
DEFAULT_PORT = 8125
5151

52+
# Socket prefixes
53+
UNIX_ADDRESS_SCHEME = "unix://"
54+
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
55+
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"
56+
5257
# Buffering-related values (in seconds)
5358
DEFAULT_FLUSH_INTERVAL = 0.3
5459
MIN_FLUSH_INTERVAL = 0.0001
@@ -731,11 +736,34 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
731736

732737
@classmethod
733738
def _get_uds_socket(cls, socket_path, timeout):
734-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
735-
sock.settimeout(timeout)
736-
cls._ensure_min_send_buffer_size(sock)
737-
sock.connect(socket_path)
738-
return sock
739+
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
740+
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
741+
valid_socket_kinds = [socket.SOCK_DGRAM]
742+
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
743+
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
744+
valid_socket_kinds = [socket.SOCK_STREAM]
745+
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
746+
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
747+
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]
748+
749+
last_error = ValueError("Invalid socket path")
750+
for socket_kind in valid_socket_kinds:
751+
try:
752+
sock = socket.socket(socket.AF_UNIX, socket_kind)
753+
sock.settimeout(timeout)
754+
cls._ensure_min_send_buffer_size(sock)
755+
sock.connect(socket_path)
756+
log.debug("Connected to socket %s with kind %s", socket_path, socket_kind.name)
757+
return sock
758+
except Exception as e:
759+
if sock is not None:
760+
sock.close()
761+
log.debug("Failed to connect to %s with kind %s: %s", socket_path, socket_kind.name, e)
762+
if e.errno == errno.EPROTOTYPE:
763+
last_error = e
764+
continue
765+
raise e
766+
raise last_error
739767

740768
@classmethod
741769
def _get_udp_socket(cls, host, port, timeout):
@@ -1253,7 +1281,7 @@ def _xmit_packet(self, packet, is_telemetry):
12531281
)
12541282
self.close_socket()
12551283
except Exception as exc:
1256-
print("Unexpected error: %s", exc)
1284+
print("Unexpected error: ", exc)
12571285
log.error("Unexpected error: %s", str(exc))
12581286

12591287
if not is_telemetry and self._telemetry:

tests/integration/dogstatsd/test_statsd_sender.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
import itertools
2+
import os
23
import socket
34
from threading import Thread
5+
import uuid
46

57
import pytest
68

79
from datadog.dogstatsd.base import DogStatsd
810

911
@pytest.mark.parametrize(
10-
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
11-
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
12+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
13+
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
1214
)
13-
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
15+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
1416
# Test basic sender operation with an assortment of options
15-
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
17+
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
1618
statsd = DogStatsd(
1719
telemetry_min_flush_interval=0,
1820
disable_background_sender=disable_background_sender,
@@ -101,3 +103,47 @@ def test_buffering_with_context():
101103
bar.settimeout(5)
102104
msg = bar.recv(8192)
103105
assert msg == b"first:1|c\n"
106+
107+
@pytest.fixture()
108+
def socket_dir():
109+
# Use relative path to avoid "AF_UNIX path too long" errors
110+
sock_dir = os.path.join(os.path.dirname(os.path.relpath(__file__)), "temp_sockets")
111+
if not os.path.exists(sock_dir):
112+
os.mkdir(sock_dir)
113+
yield sock_dir
114+
for file in os.scandir(sock_dir):
115+
os.remove(file)
116+
os.rmdir(sock_dir)
117+
118+
@pytest.mark.parametrize(
119+
"socket_prefix, socket_kind, success",
120+
[
121+
("", socket.SOCK_DGRAM, True),
122+
("", socket.SOCK_STREAM, True),
123+
("unix://", socket.SOCK_DGRAM, True),
124+
("unix://", socket.SOCK_STREAM, True),
125+
("unixstream://", socket.SOCK_DGRAM, False),
126+
("unixstream://", socket.SOCK_STREAM, True),
127+
("unixgram://", socket.SOCK_DGRAM, True),
128+
("unixgram://", socket.SOCK_STREAM, False)
129+
]
130+
)
131+
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
132+
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
133+
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
134+
listener_socket.bind(socket_path)
135+
136+
if socket_kind == socket.SOCK_STREAM:
137+
listener_socket.listen(1)
138+
139+
statsd = DogStatsd(
140+
socket_path = socket_prefix + socket_path
141+
)
142+
143+
if success:
144+
assert statsd.get_socket() is not None
145+
else:
146+
with pytest.raises(OSError):
147+
statsd.get_socket()
148+
149+
listener_socket.close()

0 commit comments

Comments
 (0)