Skip to content

Rewrite UPD client when the extension is present to avoid loading datadogpy #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions datadog_lambda/dogstatsd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import logging
import os
import socket
import errno
import re
from threading import Lock


MIN_SEND_BUFFER_SIZE = 32 * 1024
log = logging.getLogger("datadog_lambda.dogstatsd")


class DogStatsd(object):
def __init__(self):
self._socket_lock = Lock()
self.socket_path = None
self.host = "localhost"
self.port = 8125
self.socket = None
self.encoding = "utf-8"

def get_socket(self, telemetry=False):
"""
Return a connected socket.

Note: connect the socket before assigning it to the class instance to
avoid bad thread race conditions.
"""
with self._socket_lock:
self.socket = self._get_udp_socket(
self.host,
self.port,
)
return self.socket

@classmethod
def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
# Increase the receiving buffer size where needed (e.g. MacOS has 4k RX
# buffers which is half of the max packet size that the client will send.
if os.name == "posix":
try:
recv_buff_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
if recv_buff_size <= min_size:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, min_size)
log.debug("Socket send buffer increased to %dkb", min_size / 1024)
finally:
pass

@classmethod
def _get_udp_socket(cls, host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(0)
cls._ensure_min_send_buffer_size(sock)
sock.connect((host, port))

return sock

def distribution(self, metric, value, tags=None):
"""
Send a global distribution value, optionally setting tags.

>>> statsd.distribution("uploaded.file.size", 1445)
>>> statsd.distribution("album.photo.count", 26, tags=["gender:female"])
"""
self._report(metric, "d", value, tags)

def close_socket(self):
"""
Closes connected socket if connected.
"""
with self._socket_lock:
if self.socket:
try:
self.socket.close()
except OSError as e:
log.error("Unexpected error: %s", str(e))
self.socket = None

def normalize_tags(self, tag_list):
TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE)
TAG_INVALID_CHARS_SUBS = "_"
return [
re.sub(TAG_INVALID_CHARS_RE, TAG_INVALID_CHARS_SUBS, tag)
for tag in tag_list
]

def _serialize_metric(self, metric, metric_type, value, tags):
# Create/format the metric packet
return "%s:%s|%s%s" % (
metric,
value,
metric_type,
("|#" + ",".join(self.normalize_tags(tags))) if tags else "",
)

def _report(self, metric, metric_type, value, tags):
if value is None:
return

payload = self._serialize_metric(metric, metric_type, value, tags)

# Send it
self._send_to_server(payload)

def _send_to_server(self, packet):
try:
mysocket = self.socket or self.get_socket()
mysocket.send(packet.encode(self.encoding))
return True
except socket.timeout:
# dogstatsd is overflowing, drop the packets (mimicks the UDP behaviour)
pass
except (socket.herror, socket.gaierror) as socket_err:
log.warning(
"Error submitting packet: %s, dropping the packet and closing the socket",
socket_err,
)
self.close_socket()
except socket.error as socket_err:
if socket_err.errno == errno.EAGAIN:
log.debug(
"Socket send would block: %s, dropping the packet", socket_err
)
elif socket_err.errno == errno.ENOBUFS:
log.debug("Socket buffer full: %s, dropping the packet", socket_err)
elif socket_err.errno == errno.EMSGSIZE:
log.debug(
"Packet size too big (size: %d): %s, dropping the packet",
len(packet.encode(self.encoding)),
socket_err,
)
else:
log.warning(
"Error submitting packet: %s, dropping the packet and closing the socket",
socket_err,
)
self.close_socket()
except Exception as e:
log.error("Unexpected error: %s", str(e))
return False


statsd = DogStatsd()
6 changes: 1 addition & 5 deletions datadog_lambda/statsd_writer.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
from datadog_lambda.stats_writer import StatsWriter
from datadog import initialize, statsd
from datadog_lambda.dogstatsd import statsd
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment in the other place (threadstats_writer?) where datadog is lazy loaded? just so people don't accidentally get import datadog into the critical path in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I've updated both files with the comment. In threadstats_writer, there is no need to lazy load datadog as this file is lazy loaded itself here : https://github.com/DataDog/datadog-lambda-python/blob/main/datadog_lambda/metric.py#L30



class StatsDWriter(StatsWriter):
"""
Writes distribution metrics using StatsD protocol
"""

def __init__(self):
options = {"statsd_host": "127.0.0.1", "statsd_port": 8125}
initialize(**options)

def distribution(self, metric_name, value, tags=[], timestamp=None):
statsd.distribution(metric_name, value, tags=tags)

Expand Down
51 changes: 51 additions & 0 deletions tests/test_dogstatsd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from collections import deque
import unittest

from datadog_lambda.dogstatsd import statsd


class FakeSocket(object):
def __init__(self):
self.payloads = deque()

def send(self, payload):
self.payloads.append(payload)

def recv(self, count=1, reset_wait=False, no_wait=False):
out = []
for _ in range(count):
out.append(self.payloads.popleft().decode("utf-8"))
return "\n".join(out)

def close(self):
pass


class TestDogStatsd(unittest.TestCase):
def setUp(self):
statsd.socket = FakeSocket()

def tearDown(self):
statsd.close_socket()

def recv(self, *args, **kwargs):
return statsd.socket.recv(*args, **kwargs)

def test_init(self):
self.assertEqual(statsd.host, "localhost")
self.assertEqual(statsd.port, 8125)
self.assertEqual(statsd.encoding, "utf-8")

def test_distribution_no_tags(self):
statsd.distribution("my.test.metric", 3)
payload = self.recv()
metrics = payload.split("\n")
self.assertEqual(len(metrics), 1)
self.assertEqual("my.test.metric:3|d", metrics[0])

def test_distribution_with_tags(self):
statsd.distribution("my.test.tags.metric", 3, tags=["taga:valuea,tagb:valueb"])
payload = self.recv()
metrics = payload.split("\n")
self.assertEqual(len(metrics), 1)
self.assertEqual("my.test.tags.metric:3|d|#taga:valuea_tagb:valueb", metrics[0])