diff --git a/datadog_lambda/api.py b/datadog_lambda/api.py index a30b12d1..079f69da 100644 --- a/datadog_lambda/api.py +++ b/datadog_lambda/api.py @@ -52,6 +52,9 @@ def init_api(): not should_use_extension and not os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" ): + # Make sure that this package would always be lazy-loaded/outside from the critical path + # since underlying packages are quite heavy to load + # and useless when the extension is present from datadog import api if not api._api_key: diff --git a/datadog_lambda/dogstatsd.py b/datadog_lambda/dogstatsd.py new file mode 100644 index 00000000..a627492d --- /dev/null +++ b/datadog_lambda/dogstatsd.py @@ -0,0 +1,143 @@ +import logging +import os +import socket +import errno +import re +from threading import Lock + + +MIN_SEND_BUFFER_SIZE = 32 * 1024 +log = logging.getLogger("datadog_lambda.dogstatsd") + + +class DogStatsd(object): + def __init__(self): + self._socket_lock = Lock() + self.socket_path = None + self.host = "localhost" + self.port = 8125 + self.socket = None + self.encoding = "utf-8" + + def get_socket(self, telemetry=False): + """ + Return a connected socket. + + Note: connect the socket before assigning it to the class instance to + avoid bad thread race conditions. + """ + with self._socket_lock: + self.socket = self._get_udp_socket( + self.host, + self.port, + ) + return self.socket + + @classmethod + def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE): + # Increase the receiving buffer size where needed (e.g. MacOS has 4k RX + # buffers which is half of the max packet size that the client will send. + if os.name == "posix": + try: + recv_buff_size = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) + if recv_buff_size <= min_size: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, min_size) + log.debug("Socket send buffer increased to %dkb", min_size / 1024) + finally: + pass + + @classmethod + def _get_udp_socket(cls, host, port): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setblocking(0) + cls._ensure_min_send_buffer_size(sock) + sock.connect((host, port)) + + return sock + + def distribution(self, metric, value, tags=None): + """ + Send a global distribution value, optionally setting tags. + + >>> statsd.distribution("uploaded.file.size", 1445) + >>> statsd.distribution("album.photo.count", 26, tags=["gender:female"]) + """ + self._report(metric, "d", value, tags) + + def close_socket(self): + """ + Closes connected socket if connected. + """ + with self._socket_lock: + if self.socket: + try: + self.socket.close() + except OSError as e: + log.error("Unexpected error: %s", str(e)) + self.socket = None + + def normalize_tags(self, tag_list): + TAG_INVALID_CHARS_RE = re.compile(r"[^\w\d_\-:/\.]", re.UNICODE) + TAG_INVALID_CHARS_SUBS = "_" + return [ + re.sub(TAG_INVALID_CHARS_RE, TAG_INVALID_CHARS_SUBS, tag) + for tag in tag_list + ] + + def _serialize_metric(self, metric, metric_type, value, tags): + # Create/format the metric packet + return "%s:%s|%s%s" % ( + metric, + value, + metric_type, + ("|#" + ",".join(self.normalize_tags(tags))) if tags else "", + ) + + def _report(self, metric, metric_type, value, tags): + if value is None: + return + + payload = self._serialize_metric(metric, metric_type, value, tags) + + # Send it + self._send_to_server(payload) + + def _send_to_server(self, packet): + try: + mysocket = self.socket or self.get_socket() + mysocket.send(packet.encode(self.encoding)) + return True + except socket.timeout: + # dogstatsd is overflowing, drop the packets (mimicks the UDP behaviour) + pass + except (socket.herror, socket.gaierror) as socket_err: + log.warning( + "Error submitting packet: %s, dropping the packet and closing the socket", + socket_err, + ) + self.close_socket() + except socket.error as socket_err: + if socket_err.errno == errno.EAGAIN: + log.debug( + "Socket send would block: %s, dropping the packet", socket_err + ) + elif socket_err.errno == errno.ENOBUFS: + log.debug("Socket buffer full: %s, dropping the packet", socket_err) + elif socket_err.errno == errno.EMSGSIZE: + log.debug( + "Packet size too big (size: %d): %s, dropping the packet", + len(packet.encode(self.encoding)), + socket_err, + ) + else: + log.warning( + "Error submitting packet: %s, dropping the packet and closing the socket", + socket_err, + ) + self.close_socket() + except Exception as e: + log.error("Unexpected error: %s", str(e)) + return False + + +statsd = DogStatsd() diff --git a/datadog_lambda/statsd_writer.py b/datadog_lambda/statsd_writer.py index cd849e67..33843dc6 100644 --- a/datadog_lambda/statsd_writer.py +++ b/datadog_lambda/statsd_writer.py @@ -1,5 +1,5 @@ from datadog_lambda.stats_writer import StatsWriter -from datadog import initialize, statsd +from datadog_lambda.dogstatsd import statsd class StatsDWriter(StatsWriter): @@ -7,10 +7,6 @@ class StatsDWriter(StatsWriter): Writes distribution metrics using StatsD protocol """ - def __init__(self): - options = {"statsd_host": "127.0.0.1", "statsd_port": 8125} - initialize(**options) - def distribution(self, metric_name, value, tags=[], timestamp=None): statsd.distribution(metric_name, value, tags=tags) diff --git a/datadog_lambda/thread_stats_writer.py b/datadog_lambda/thread_stats_writer.py index 91ffcd45..bfcf3c99 100644 --- a/datadog_lambda/thread_stats_writer.py +++ b/datadog_lambda/thread_stats_writer.py @@ -1,4 +1,7 @@ import logging + +# Make sure that this package would always be lazy-loaded/outside from the critical path +# since underlying packages are quite heavy to load and useless when the extension is present from datadog.threadstats import ThreadStats from datadog_lambda.stats_writer import StatsWriter diff --git a/tests/test_dogstatsd.py b/tests/test_dogstatsd.py new file mode 100644 index 00000000..149e1a70 --- /dev/null +++ b/tests/test_dogstatsd.py @@ -0,0 +1,51 @@ +from collections import deque +import unittest + +from datadog_lambda.dogstatsd import statsd + + +class FakeSocket(object): + def __init__(self): + self.payloads = deque() + + def send(self, payload): + self.payloads.append(payload) + + def recv(self, count=1, reset_wait=False, no_wait=False): + out = [] + for _ in range(count): + out.append(self.payloads.popleft().decode("utf-8")) + return "\n".join(out) + + def close(self): + pass + + +class TestDogStatsd(unittest.TestCase): + def setUp(self): + statsd.socket = FakeSocket() + + def tearDown(self): + statsd.close_socket() + + def recv(self, *args, **kwargs): + return statsd.socket.recv(*args, **kwargs) + + def test_init(self): + self.assertEqual(statsd.host, "localhost") + self.assertEqual(statsd.port, 8125) + self.assertEqual(statsd.encoding, "utf-8") + + def test_distribution_no_tags(self): + statsd.distribution("my.test.metric", 3) + payload = self.recv() + metrics = payload.split("\n") + self.assertEqual(len(metrics), 1) + self.assertEqual("my.test.metric:3|d", metrics[0]) + + def test_distribution_with_tags(self): + statsd.distribution("my.test.tags.metric", 3, tags=["taga:valuea,tagb:valueb"]) + payload = self.recv() + metrics = payload.split("\n") + self.assertEqual(len(metrics), 1) + self.assertEqual("my.test.tags.metric:3|d|#taga:valuea_tagb:valueb", metrics[0])