Skip to content

Commit cd6be68

Browse files
authored
Rewrite UPD client when the extension is present to avoid loading datadogpy (#170)
1 parent 21fc337 commit cd6be68

File tree

5 files changed

+201
-5
lines changed

5 files changed

+201
-5
lines changed

datadog_lambda/api.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ def init_api():
5252
not should_use_extension
5353
and not os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true"
5454
):
55+
# Make sure that this package would always be lazy-loaded/outside from the critical path
56+
# since underlying packages are quite heavy to load
57+
# and useless when the extension is present
5558
from datadog import api
5659

5760
if not api._api_key:

datadog_lambda/dogstatsd.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import logging
2+
import os
3+
import socket
4+
import errno
5+
import re
6+
from threading import Lock
7+
8+
9+
MIN_SEND_BUFFER_SIZE = 32 * 1024
10+
log = logging.getLogger("datadog_lambda.dogstatsd")
11+
12+
13+
class DogStatsd(object):
14+
def __init__(self):
15+
self._socket_lock = Lock()
16+
self.socket_path = None
17+
self.host = "localhost"
18+
self.port = 8125
19+
self.socket = None
20+
self.encoding = "utf-8"
21+
22+
def get_socket(self, telemetry=False):
23+
"""
24+
Return a connected socket.
25+
26+
Note: connect the socket before assigning it to the class instance to
27+
avoid bad thread race conditions.
28+
"""
29+
with self._socket_lock:
30+
self.socket = self._get_udp_socket(
31+
self.host,
32+
self.port,
33+
)
34+
return self.socket
35+
36+
@classmethod
37+
def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
38+
# Increase the receiving buffer size where needed (e.g. MacOS has 4k RX
39+
# buffers which is half of the max packet size that the client will send.
40+
if os.name == "posix":
41+
try:
42+
recv_buff_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
43+
if recv_buff_size <= min_size:
44+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, min_size)
45+
log.debug("Socket send buffer increased to %dkb", min_size / 1024)
46+
finally:
47+
pass
48+
49+
@classmethod
50+
def _get_udp_socket(cls, host, port):
51+
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
52+
sock.setblocking(0)
53+
cls._ensure_min_send_buffer_size(sock)
54+
sock.connect((host, port))
55+
56+
return sock
57+
58+
def distribution(self, metric, value, tags=None):
59+
"""
60+
Send a global distribution value, optionally setting tags.
61+
62+
>>> statsd.distribution("uploaded.file.size", 1445)
63+
>>> statsd.distribution("album.photo.count", 26, tags=["gender:female"])
64+
"""
65+
self._report(metric, "d", value, tags)
66+
67+
def close_socket(self):
68+
"""
69+
Closes connected socket if connected.
70+
"""
71+
with self._socket_lock:
72+
if self.socket:
73+
try:
74+
self.socket.close()
75+
except OSError as e:
76+
log.error("Unexpected error: %s", str(e))
77+
self.socket = None
78+
79+
def normalize_tags(self, tag_list):
80+
TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE)
81+
TAG_INVALID_CHARS_SUBS = "_"
82+
return [
83+
re.sub(TAG_INVALID_CHARS_RE, TAG_INVALID_CHARS_SUBS, tag)
84+
for tag in tag_list
85+
]
86+
87+
def _serialize_metric(self, metric, metric_type, value, tags):
88+
# Create/format the metric packet
89+
return "%s:%s|%s%s" % (
90+
metric,
91+
value,
92+
metric_type,
93+
("|#" + ",".join(self.normalize_tags(tags))) if tags else "",
94+
)
95+
96+
def _report(self, metric, metric_type, value, tags):
97+
if value is None:
98+
return
99+
100+
payload = self._serialize_metric(metric, metric_type, value, tags)
101+
102+
# Send it
103+
self._send_to_server(payload)
104+
105+
def _send_to_server(self, packet):
106+
try:
107+
mysocket = self.socket or self.get_socket()
108+
mysocket.send(packet.encode(self.encoding))
109+
return True
110+
except socket.timeout:
111+
# dogstatsd is overflowing, drop the packets (mimicks the UDP behaviour)
112+
pass
113+
except (socket.herror, socket.gaierror) as socket_err:
114+
log.warning(
115+
"Error submitting packet: %s, dropping the packet and closing the socket",
116+
socket_err,
117+
)
118+
self.close_socket()
119+
except socket.error as socket_err:
120+
if socket_err.errno == errno.EAGAIN:
121+
log.debug(
122+
"Socket send would block: %s, dropping the packet", socket_err
123+
)
124+
elif socket_err.errno == errno.ENOBUFS:
125+
log.debug("Socket buffer full: %s, dropping the packet", socket_err)
126+
elif socket_err.errno == errno.EMSGSIZE:
127+
log.debug(
128+
"Packet size too big (size: %d): %s, dropping the packet",
129+
len(packet.encode(self.encoding)),
130+
socket_err,
131+
)
132+
else:
133+
log.warning(
134+
"Error submitting packet: %s, dropping the packet and closing the socket",
135+
socket_err,
136+
)
137+
self.close_socket()
138+
except Exception as e:
139+
log.error("Unexpected error: %s", str(e))
140+
return False
141+
142+
143+
statsd = DogStatsd()

datadog_lambda/statsd_writer.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
from datadog_lambda.stats_writer import StatsWriter
2-
from datadog import initialize, statsd
2+
from datadog_lambda.dogstatsd import statsd
33

44

55
class StatsDWriter(StatsWriter):
66
"""
77
Writes distribution metrics using StatsD protocol
88
"""
99

10-
def __init__(self):
11-
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
12-
initialize(**options)
13-
1410
def distribution(self, metric_name, value, tags=[], timestamp=None):
1511
statsd.distribution(metric_name, value, tags=tags)
1612

datadog_lambda/thread_stats_writer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import logging
2+
3+
# Make sure that this package would always be lazy-loaded/outside from the critical path
4+
# since underlying packages are quite heavy to load and useless when the extension is present
25
from datadog.threadstats import ThreadStats
36
from datadog_lambda.stats_writer import StatsWriter
47

tests/test_dogstatsd.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from collections import deque
2+
import unittest
3+
4+
from datadog_lambda.dogstatsd import statsd
5+
6+
7+
class FakeSocket(object):
8+
def __init__(self):
9+
self.payloads = deque()
10+
11+
def send(self, payload):
12+
self.payloads.append(payload)
13+
14+
def recv(self, count=1, reset_wait=False, no_wait=False):
15+
out = []
16+
for _ in range(count):
17+
out.append(self.payloads.popleft().decode("utf-8"))
18+
return "\n".join(out)
19+
20+
def close(self):
21+
pass
22+
23+
24+
class TestDogStatsd(unittest.TestCase):
25+
def setUp(self):
26+
statsd.socket = FakeSocket()
27+
28+
def tearDown(self):
29+
statsd.close_socket()
30+
31+
def recv(self, *args, **kwargs):
32+
return statsd.socket.recv(*args, **kwargs)
33+
34+
def test_init(self):
35+
self.assertEqual(statsd.host, "localhost")
36+
self.assertEqual(statsd.port, 8125)
37+
self.assertEqual(statsd.encoding, "utf-8")
38+
39+
def test_distribution_no_tags(self):
40+
statsd.distribution("my.test.metric", 3)
41+
payload = self.recv()
42+
metrics = payload.split("\n")
43+
self.assertEqual(len(metrics), 1)
44+
self.assertEqual("my.test.metric:3|d", metrics[0])
45+
46+
def test_distribution_with_tags(self):
47+
statsd.distribution("my.test.tags.metric", 3, tags=["taga:valuea,tagb:valueb"])
48+
payload = self.recv()
49+
metrics = payload.split("\n")
50+
self.assertEqual(len(metrics), 1)
51+
self.assertEqual("my.test.tags.metric:3|d|#taga:valuea_tagb:valueb", metrics[0])

0 commit comments

Comments
 (0)