Skip to content

Commit ae4d6be

Browse files
committed
AMLII-2166 - Add UDS Streams support to the DogStatsD client
Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes utilized by DD_DOGSTATSD_URL in preparation to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path.
1 parent ab20e29 commit ae4d6be

File tree

2 files changed

+87
-10
lines changed

2 files changed

+87
-10
lines changed

datadog/dogstatsd/base.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
DEFAULT_HOST = "localhost"
5050
DEFAULT_PORT = 8125
5151

52+
# Socket prefixes
53+
UNIX_ADDRESS_SCHEME = "unix://"
54+
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
55+
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"
56+
5257
# Buffering-related values (in seconds)
5358
DEFAULT_FLUSH_INTERVAL = 0.3
5459
MIN_FLUSH_INTERVAL = 0.0001
@@ -731,11 +736,40 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
731736

732737
@classmethod
733738
def _get_uds_socket(cls, socket_path, timeout):
734-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
735-
sock.settimeout(timeout)
736-
cls._ensure_min_send_buffer_size(sock)
737-
sock.connect(socket_path)
738-
return sock
739+
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
740+
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
741+
valid_socket_kinds = [socket.SOCK_DGRAM]
742+
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
743+
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
744+
valid_socket_kinds = [socket.SOCK_STREAM]
745+
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
746+
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
747+
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]
748+
749+
last_error = ValueError("Invalid socket path")
750+
for socket_kind in valid_socket_kinds:
751+
try:
752+
# This will fail in python2.7
753+
sk_name = socket_kind.name
754+
except AttributeError:
755+
sk_name = socket_kind
756+
757+
try:
758+
sock = socket.socket(socket.AF_UNIX, socket_kind)
759+
sock.settimeout(timeout)
760+
cls._ensure_min_send_buffer_size(sock)
761+
sock.connect(socket_path)
762+
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
763+
return sock
764+
except Exception as e:
765+
if sock is not None:
766+
sock.close()
767+
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
768+
if e.errno == errno.EPROTOTYPE:
769+
last_error = e
770+
continue
771+
raise e
772+
raise last_error
739773

740774
@classmethod
741775
def _get_udp_socket(cls, host, port, timeout):
@@ -1253,7 +1287,7 @@ def _xmit_packet(self, packet, is_telemetry):
12531287
)
12541288
self.close_socket()
12551289
except Exception as exc:
1256-
print("Unexpected error: %s", exc)
1290+
print("Unexpected error: ", exc)
12571291
log.error("Unexpected error: %s", str(exc))
12581292

12591293
if not is_telemetry and self._telemetry:

tests/integration/dogstatsd/test_statsd_sender.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1+
from contextlib import closing
12
import itertools
3+
import os
4+
import shutil
25
import socket
6+
import tempfile
37
from threading import Thread
8+
import uuid
49

510
import pytest
611

712
from datadog.dogstatsd.base import DogStatsd
813

914
@pytest.mark.parametrize(
10-
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
11-
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
15+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
16+
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
1217
)
13-
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
18+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
1419
# Test basic sender operation with an assortment of options
15-
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
20+
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
1621
statsd = DogStatsd(
1722
telemetry_min_flush_interval=0,
1823
disable_background_sender=disable_background_sender,
@@ -101,3 +106,41 @@ def test_buffering_with_context():
101106
bar.settimeout(5)
102107
msg = bar.recv(8192)
103108
assert msg == b"first:1|c\n"
109+
110+
@pytest.fixture()
111+
def socket_dir():
112+
tempdir = tempfile.mkdtemp()
113+
yield tempdir
114+
shutil.rmtree(tempdir)
115+
116+
@pytest.mark.parametrize(
117+
"socket_prefix, socket_kind, success",
118+
[
119+
("", socket.SOCK_DGRAM, True),
120+
("", socket.SOCK_STREAM, True),
121+
("unix://", socket.SOCK_DGRAM, True),
122+
("unix://", socket.SOCK_STREAM, True),
123+
("unixstream://", socket.SOCK_DGRAM, False),
124+
("unixstream://", socket.SOCK_STREAM, True),
125+
("unixgram://", socket.SOCK_DGRAM, True),
126+
("unixgram://", socket.SOCK_STREAM, False)
127+
]
128+
)
129+
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
130+
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
131+
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
132+
listener_socket.bind(socket_path)
133+
134+
if socket_kind == socket.SOCK_STREAM:
135+
listener_socket.listen(1)
136+
137+
with closing(listener_socket):
138+
statsd = DogStatsd(
139+
socket_path = socket_prefix + socket_path
140+
)
141+
142+
if success:
143+
assert statsd.get_socket() is not None
144+
else:
145+
with pytest.raises(OSError):
146+
statsd.get_socket()

0 commit comments

Comments
 (0)