Skip to content

Commit e62632e

Browse files
committed
AMLII-2166 - Add UDS Streams support to the DogStatsD client
Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes utilized by DD_DOGSTATSD_URL in preparation to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path.
1 parent ab20e29 commit e62632e

File tree

2 files changed

+81
-10
lines changed

2 files changed

+81
-10
lines changed

datadog/dogstatsd/base.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
DEFAULT_HOST = "localhost"
5050
DEFAULT_PORT = 8125
5151

52+
# Socket prefixes
53+
UNIX_ADDRESS_SCHEME = "unix://"
54+
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
55+
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"
56+
5257
# Buffering-related values (in seconds)
5358
DEFAULT_FLUSH_INTERVAL = 0.3
5459
MIN_FLUSH_INTERVAL = 0.0001
@@ -731,11 +736,34 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
731736

732737
@classmethod
733738
def _get_uds_socket(cls, socket_path, timeout):
734-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
735-
sock.settimeout(timeout)
736-
cls._ensure_min_send_buffer_size(sock)
737-
sock.connect(socket_path)
738-
return sock
739+
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
740+
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
741+
valid_socket_kinds = [socket.SOCK_DGRAM]
742+
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
743+
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
744+
valid_socket_kinds = [socket.SOCK_STREAM]
745+
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
746+
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
747+
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]
748+
749+
last_error = ValueError("Invalid socket path")
750+
for socket_kind in valid_socket_kinds:
751+
try:
752+
sock = socket.socket(socket.AF_UNIX, socket_kind)
753+
sock.settimeout(timeout)
754+
cls._ensure_min_send_buffer_size(sock)
755+
sock.connect(socket_path)
756+
log.debug("Connected to socket %s with kind %s", socket_path, socket_kind.name)
757+
return sock
758+
except Exception as e:
759+
if sock is not None:
760+
sock.close()
761+
log.debug("Failed to connect to %s with kind %s: %s", socket_path, socket_kind.name, e)
762+
if e.errno == errno.EPROTOTYPE:
763+
last_error = e
764+
continue
765+
raise e
766+
raise last_error
739767

740768
@classmethod
741769
def _get_udp_socket(cls, host, port, timeout):
@@ -1253,7 +1281,7 @@ def _xmit_packet(self, packet, is_telemetry):
12531281
)
12541282
self.close_socket()
12551283
except Exception as exc:
1256-
print("Unexpected error: %s", exc)
1284+
print("Unexpected error: ", exc)
12571285
log.error("Unexpected error: %s", str(exc))
12581286

12591287
if not is_telemetry and self._telemetry:

tests/integration/dogstatsd/test_statsd_sender.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
import itertools
2+
import os
3+
import shutil
24
import socket
5+
import tempfile
36
from threading import Thread
7+
import uuid
48

59
import pytest
610

711
from datadog.dogstatsd.base import DogStatsd
812

913
@pytest.mark.parametrize(
10-
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
11-
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
14+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
15+
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
1216
)
13-
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
17+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
1418
# Test basic sender operation with an assortment of options
15-
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
19+
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
1620
statsd = DogStatsd(
1721
telemetry_min_flush_interval=0,
1822
disable_background_sender=disable_background_sender,
@@ -101,3 +105,42 @@ def test_buffering_with_context():
101105
bar.settimeout(5)
102106
msg = bar.recv(8192)
103107
assert msg == b"first:1|c\n"
108+
109+
@pytest.fixture()
110+
def socket_dir():
111+
tempdir = tempfile.mkdtemp()
112+
yield tempdir
113+
shutil.rmtree(tempdir)
114+
115+
@pytest.mark.parametrize(
116+
"socket_prefix, socket_kind, success",
117+
[
118+
("", socket.SOCK_DGRAM, True),
119+
("", socket.SOCK_STREAM, True),
120+
("unix://", socket.SOCK_DGRAM, True),
121+
("unix://", socket.SOCK_STREAM, True),
122+
("unixstream://", socket.SOCK_DGRAM, False),
123+
("unixstream://", socket.SOCK_STREAM, True),
124+
("unixgram://", socket.SOCK_DGRAM, True),
125+
("unixgram://", socket.SOCK_STREAM, False)
126+
]
127+
)
128+
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
129+
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
130+
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
131+
listener_socket.bind(socket_path)
132+
133+
if socket_kind == socket.SOCK_STREAM:
134+
listener_socket.listen(1)
135+
136+
statsd = DogStatsd(
137+
socket_path = socket_prefix + socket_path
138+
)
139+
140+
if success:
141+
assert statsd.get_socket() is not None
142+
else:
143+
with pytest.raises(OSError):
144+
statsd.get_socket()
145+
146+
listener_socket.close()

0 commit comments

Comments
 (0)