Skip to content

Commit 52a23f8

Browse files
authored
Add setters to configure the global instance (#790)
* Add method to enable background sender mode on the global instance * Add method to set socket timeout on the global instance
1 parent 11a38d0 commit 52a23f8

File tree

3 files changed

+65
-0
lines changed

3 files changed

+65
-0
lines changed

datadog/dogstatsd/base.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,37 @@ def __init__(
415415

416416
self._queue = None
417417
if not disable_background_sender:
418+
self.enable_background_sender(sender_queue_size, sender_queue_timeout)
419+
420+
def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
421+
"""
422+
Use a background thread to communicate with the dogstatsd server.
423+
When enabled, a background thread will be used to send metric payloads to the Agent.
424+
425+
Applications should call wait_for_pending() before exiting to make sure all pending payloads are sent.
426+
427+
This method is not thread safe and should not be called concurrently with other methods on the current object.
428+
Normally, this should be called shortly after process initialization (for example from a post-fork hook in a
429+
forking server).
430+
431+
:param sender_queue_size: Set the maximum number of packets to queue for the sender. Optional
432+
How may packets to queue before blocking or dropping the packet if the packet queue is already full.
433+
Default: 0 (unlimited).
434+
:type sender_queue_size: integer
435+
436+
:param sender_queue_timeout: Set timeout for packet queue operations, in seconds. Optional.
437+
How long the application thread is willing to wait for the queue clear up before dropping the metric packet.
438+
If set to None, wait forever.
439+
If set to zero drop the packet immediately if the queue is full.
440+
Default: 0 (no wait)
441+
:type sender_queue_timeout: float
442+
"""
443+
444+
# Avoid a race on _queue with the background buffer flush thread that reads _queue.
445+
with self._buffer_lock:
446+
if self._queue is not None:
447+
return
448+
418449
self._queue = queue.Queue(sender_queue_size)
419450
self._start_sender_thread()
420451
if sender_queue_timeout is None:
@@ -559,6 +590,18 @@ def get_socket(self, telemetry=False):
559590

560591
return self.socket
561592

593+
def set_socket_timeout(self, timeout):
594+
"""
595+
Set timeout for socket operations, in seconds.
596+
597+
If set to zero, never wait if operation can not be completed immediately. If set to None, wait forever.
598+
This option does not affect hostname resolution when using UDP.
599+
"""
600+
with self._socket_lock:
601+
self.socket_timeout = timeout
602+
if self.socket:
603+
self.socket.settimeout(timeout)
604+
562605
@classmethod
563606
def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
564607
# Increase the receiving buffer size where needed (e.g. MacOS has 4k RX

tests/integration/dogstatsd/test_statsd_sender.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,12 @@ def reader_thread():
3737

3838
t.join(timeout=10)
3939
assert not t.is_alive()
40+
41+
def test_set_socket_timeout():
42+
statsd = DogStatsd(socket_timeout=0)
43+
assert statsd.get_socket().gettimeout() == 0
44+
statsd.set_socket_timeout(1)
45+
assert statsd.get_socket().gettimeout() == 1
46+
statsd.close_socket()
47+
assert statsd.get_socket().gettimeout() == 1
48+

tests/unit/dogstatsd/test_statsd.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def __init__(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
4646

4747
self._flush_interval = flush_interval
4848
self._flush_wait = False
49+
self.timeout = () # unit tuple = settimeout was not called
4950

5051
def send(self, payload):
5152
if is_p3k():
@@ -77,6 +78,8 @@ def close(self):
7778
def __repr__(self):
7879
return str(self.payloads)
7980

81+
def settimeout(self, timeout):
82+
self.timeout = timeout
8083

8184
class BrokenSocket(FakeSocket):
8285
def __init__(self, error_number=None):
@@ -1875,6 +1878,9 @@ def test_sender_mode(self):
18751878
statsd = DogStatsd(disable_background_sender=True)
18761879
self.assertIsNone(statsd._queue)
18771880

1881+
statsd.enable_background_sender()
1882+
self.assertIsNotNone(statsd._queue)
1883+
18781884
statsd = DogStatsd(disable_background_sender=False)
18791885
self.assertIsNotNone(statsd._queue)
18801886

@@ -1886,3 +1892,10 @@ def test_sender_calls_task_done(self):
18861892

18871893
def test_sender_queue_no_timeout(self):
18881894
statsd = DogStatsd(disable_background_sender=False, sender_queue_timeout=None)
1895+
1896+
def test_set_socket_timeout(self):
1897+
statsd = DogStatsd(disable_background_sender=False)
1898+
statsd.socket = FakeSocket()
1899+
statsd.set_socket_timeout(1)
1900+
self.assertEqual(statsd.socket.timeout, 1)
1901+
self.assertEqual(statsd.socket_timeout, 1)

0 commit comments

Comments
 (0)