Skip to content

Commit 601c5f8

Browse files
authored
Retry flush metrics from ThreadStats to Datadog over RemoteDisconnected errors. (#138)
* Retry flush metrics from ThreadStats to Datadog over RemoteDisconnected errors. * Fix comment
1 parent 922a956 commit 601c5f8

File tree

4 files changed

+79
-33
lines changed

4 files changed

+79
-33
lines changed

datadog_lambda/metric.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,42 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
9191
)
9292

9393

94+
def flush_thread_stats():
95+
""""Flush distributions from ThreadStats to Datadog.
96+
97+
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
98+
to gain better control over exception handling.
99+
"""
100+
_, dists = lambda_stats._get_aggregate_metrics_and_dists(float("inf"))
101+
count_dists = len(dists)
102+
if not count_dists:
103+
logger.debug("No distributions to flush. Continuing.")
104+
105+
lambda_stats.flush_count += 1
106+
logger.debug(
107+
"Flush #%s sending %s distributions", lambda_stats.flush_count, count_dists
108+
)
109+
try:
110+
lambda_stats.reporter.flush_distributions(dists)
111+
except Exception as e:
112+
# The nature of the root issue https://bugs.python.org/issue41345 is complex,
113+
# but comprehensive tests suggest that it is safe to retry on this specific error.
114+
if isinstance(e, api.exceptions.ClientError) and "RemoteDisconnected" in str(e):
115+
logger.debug(
116+
"Retry flush #%s due to RemoteDisconnected", lambda_stats.flush_count
117+
)
118+
try:
119+
lambda_stats.reporter.flush_distributions(dists)
120+
except Exception:
121+
logger.debug(
122+
"Flush #%s failed after retry",
123+
lambda_stats.flush_count,
124+
exc_info=True,
125+
)
126+
else:
127+
logger.debug("Flush #%s failed", lambda_stats.flush_count, exc_info=True)
128+
129+
94130
def are_enhanced_metrics_enabled():
95131
"""Check env var to find if enhanced metrics should be submitted
96132
@@ -165,3 +201,6 @@ def submit_errors_metric(lambda_context):
165201
"DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com")
166202
)
167203
logger.debug("Setting DATADOG_HOST to %s", api._api_host)
204+
205+
# Unmute exceptions from datadog api client, so we can catch and handle them
206+
api._mute = False

datadog_lambda/wrapper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from datadog_lambda.cold_start import set_cold_start, is_cold_start
1313
from datadog_lambda.constants import XraySubsegment, TraceContextSource
1414
from datadog_lambda.metric import (
15-
lambda_stats,
15+
flush_thread_stats,
1616
submit_invocations_metric,
1717
submit_errors_metric,
1818
)
@@ -177,7 +177,7 @@ def _after(self, event, context):
177177
)
178178

179179
if not self.flush_to_log or should_use_extension:
180-
lambda_stats.flush(float("inf"))
180+
flush_thread_stats()
181181
if should_use_extension:
182182
flush_extension()
183183

tests/test_metric.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
except ImportError:
77
from mock import patch, call
88

9-
from datadog_lambda.metric import lambda_metric
9+
from datadog.api.exceptions import ClientError
10+
from datadog_lambda.metric import lambda_metric, flush_thread_stats
1011
from datadog_lambda.tags import _format_dd_lambda_layer_tag
1112

1213

@@ -36,3 +37,22 @@ def test_lambda_metric_flush_to_log(self):
3637
self.mock_metric_lambda_stats.distribution.assert_not_called()
3738

3839
del os.environ["DD_FLUSH_TO_LOG"]
40+
41+
42+
class TestFlushThreadStats(unittest.TestCase):
43+
def setUp(self):
44+
patcher = patch(
45+
"datadog.threadstats.reporters.HttpReporter.flush_distributions"
46+
)
47+
self.mock_threadstats_flush_distributions = patcher.start()
48+
self.addCleanup(patcher.stop)
49+
50+
def test_retry_on_remote_disconnected(self):
51+
# Raise the RemoteDisconnected error
52+
self.mock_threadstats_flush_distributions.side_effect = ClientError(
53+
"POST",
54+
"https://api.datadoghq.com/api/v1/distribution_points",
55+
"RemoteDisconnected('Remote end closed connection without response')",
56+
)
57+
flush_thread_stats()
58+
self.assertEqual(self.mock_threadstats_flush_distributions.call_count, 2)

tests/test_wrapper.py

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,6 @@ def setUp(self):
3838
self.mock_threadstats_flush_distributions = patcher.start()
3939
self.addCleanup(patcher.stop)
4040

41-
self.metric_lambda_stats_patcher = patch("datadog_lambda.metric.lambda_stats")
42-
self.mock_metric_lambda_stats = self.metric_lambda_stats_patcher.start()
43-
self.addCleanup(self.metric_lambda_stats_patcher.stop)
44-
45-
self.wrapper_lambda_stats_patcher = patch("datadog_lambda.wrapper.lambda_stats")
46-
self.mock_wrapper_lambda_stats = self.wrapper_lambda_stats_patcher.start()
47-
self.addCleanup(self.wrapper_lambda_stats_patcher.stop)
48-
4941
patcher = patch("datadog_lambda.wrapper.extract_dd_trace_context")
5042
self.mock_extract_dd_trace_context = patcher.start()
5143
self.mock_extract_dd_trace_context.return_value = ({}, None)
@@ -101,10 +93,23 @@ def lambda_handler(event, context):
10193

10294
lambda_handler(lambda_event, lambda_context)
10395

104-
self.mock_metric_lambda_stats.distribution.assert_has_calls(
105-
[call("test.metric", 100, timestamp=None, tags=ANY)]
96+
self.mock_threadstats_flush_distributions.assert_has_calls(
97+
[
98+
call(
99+
[
100+
{
101+
"metric": "test.metric",
102+
"points": [[ANY, [100]]],
103+
"type": "distribution",
104+
"host": None,
105+
"device": None,
106+
"tags": ANY,
107+
"interval": 10,
108+
}
109+
]
110+
)
111+
]
106112
)
107-
self.mock_wrapper_lambda_stats.flush.assert_called()
108113
self.mock_extract_dd_trace_context.assert_called_with(
109114
lambda_event, lambda_context, extractor=None
110115
)
@@ -122,16 +127,11 @@ def lambda_handler(event, context):
122127
lambda_event = {}
123128
lambda_handler(lambda_event, get_mock_context())
124129

125-
self.mock_metric_lambda_stats.distribution.assert_not_called()
126-
self.mock_wrapper_lambda_stats.flush.assert_not_called()
130+
self.mock_threadstats_flush_distributions.assert_not_called()
127131

128132
del os.environ["DD_FLUSH_TO_LOG"]
129133

130134
def test_datadog_lambda_wrapper_flush_in_thread(self):
131-
# stop patchers so mock_threadstats_flush_distributions gets called
132-
self.metric_lambda_stats_patcher.stop()
133-
self.wrapper_lambda_stats_patcher.stop()
134-
135135
# force ThreadStats to flush in thread
136136
import datadog_lambda.metric as metric_module
137137

@@ -158,15 +158,7 @@ def lambda_handler(event, context):
158158
metric_module.lambda_stats.stop()
159159
metric_module.lambda_stats.start(flush_in_thread=False)
160160

161-
# resume patchers
162-
self.metric_lambda_stats_patcher.start()
163-
self.wrapper_lambda_stats_patcher.start()
164-
165161
def test_datadog_lambda_wrapper_not_flush_in_thread(self):
166-
# stop patchers so mock_threadstats_flush_distributions gets called
167-
self.metric_lambda_stats_patcher.stop()
168-
self.wrapper_lambda_stats_patcher.stop()
169-
170162
# force ThreadStats to not flush in thread
171163
import datadog_lambda.metric as metric_module
172164

@@ -193,10 +185,6 @@ def lambda_handler(event, context):
193185
metric_module.lambda_stats.stop()
194186
metric_module.lambda_stats.start(flush_in_thread=False)
195187

196-
# resume patchers
197-
self.metric_lambda_stats_patcher.start()
198-
self.wrapper_lambda_stats_patcher.start()
199-
200188
def test_datadog_lambda_wrapper_inject_correlation_ids(self):
201189
os.environ["DD_LOGS_INJECTION"] = "True"
202190

@@ -445,5 +433,4 @@ def lambda_handler(event, context):
445433
lambda_handler_double_wrapped(lambda_event, get_mock_context())
446434

447435
self.mock_patch_all.assert_called_once()
448-
self.mock_wrapper_lambda_stats.flush.assert_called_once()
449436
self.mock_submit_invocations_metric.assert_called_once()

0 commit comments

Comments
 (0)