diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index 0eb3d604..8b9029ba 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -91,6 +91,42 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]): ) +def flush_thread_stats(): + """"Flush distributions from ThreadStats to Datadog. + + Modified based on `datadog.threadstats.base.ThreadStats.flush()`, + to gain better control over exception handling. + """ + _, dists = lambda_stats._get_aggregate_metrics_and_dists(float("inf")) + count_dists = len(dists) + if not count_dists: + logger.debug("No distributions to flush. Continuing.") + + lambda_stats.flush_count += 1 + logger.debug( + "Flush #%s sending %s distributions", lambda_stats.flush_count, count_dists + ) + try: + lambda_stats.reporter.flush_distributions(dists) + except Exception as e: + # The nature of the root issue https://bugs.python.org/issue41345 is complex, + # but comprehensive tests suggest that it is safe to retry on this specific error. + if isinstance(e, api.exceptions.ClientError) and "RemoteDisconnected" in str(e): + logger.debug( + "Retry flush #%s due to RemoteDisconnected", lambda_stats.flush_count + ) + try: + lambda_stats.reporter.flush_distributions(dists) + except Exception: + logger.debug( + "Flush #%s failed after retry", + lambda_stats.flush_count, + exc_info=True, + ) + else: + logger.debug("Flush #%s failed", lambda_stats.flush_count, exc_info=True) + + def are_enhanced_metrics_enabled(): """Check env var to find if enhanced metrics should be submitted @@ -165,3 +201,6 @@ def submit_errors_metric(lambda_context): "DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com") ) logger.debug("Setting DATADOG_HOST to %s", api._api_host) + +# Unmute exceptions from datadog api client, so we can catch and handle them +api._mute = False diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 52390dd0..0a16fe0d 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -12,7 +12,7 @@ from datadog_lambda.cold_start import set_cold_start, is_cold_start from datadog_lambda.constants import XraySubsegment, TraceContextSource from datadog_lambda.metric import ( - lambda_stats, + flush_thread_stats, submit_invocations_metric, submit_errors_metric, ) @@ -177,7 +177,7 @@ def _after(self, event, context): ) if not self.flush_to_log or should_use_extension: - lambda_stats.flush(float("inf")) + flush_thread_stats() if should_use_extension: flush_extension() diff --git a/tests/test_metric.py b/tests/test_metric.py index e7993551..1fbd33ce 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -6,7 +6,8 @@ except ImportError: from mock import patch, call -from datadog_lambda.metric import lambda_metric +from datadog.api.exceptions import ClientError +from datadog_lambda.metric import lambda_metric, flush_thread_stats from datadog_lambda.tags import _format_dd_lambda_layer_tag @@ -36,3 +37,22 @@ def test_lambda_metric_flush_to_log(self): self.mock_metric_lambda_stats.distribution.assert_not_called() del os.environ["DD_FLUSH_TO_LOG"] + + +class TestFlushThreadStats(unittest.TestCase): + def setUp(self): + patcher = patch( + "datadog.threadstats.reporters.HttpReporter.flush_distributions" + ) + self.mock_threadstats_flush_distributions = patcher.start() + self.addCleanup(patcher.stop) + + def test_retry_on_remote_disconnected(self): + # Raise the RemoteDisconnected error + self.mock_threadstats_flush_distributions.side_effect = ClientError( + "POST", + "https://api.datadoghq.com/api/v1/distribution_points", + "RemoteDisconnected('Remote end closed connection without response')", + ) + flush_thread_stats() + self.assertEqual(self.mock_threadstats_flush_distributions.call_count, 2) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 9d9e2a03..00ed0e75 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -38,14 +38,6 @@ def setUp(self): self.mock_threadstats_flush_distributions = patcher.start() self.addCleanup(patcher.stop) - self.metric_lambda_stats_patcher = patch("datadog_lambda.metric.lambda_stats") - self.mock_metric_lambda_stats = self.metric_lambda_stats_patcher.start() - self.addCleanup(self.metric_lambda_stats_patcher.stop) - - self.wrapper_lambda_stats_patcher = patch("datadog_lambda.wrapper.lambda_stats") - self.mock_wrapper_lambda_stats = self.wrapper_lambda_stats_patcher.start() - self.addCleanup(self.wrapper_lambda_stats_patcher.stop) - patcher = patch("datadog_lambda.wrapper.extract_dd_trace_context") self.mock_extract_dd_trace_context = patcher.start() self.mock_extract_dd_trace_context.return_value = ({}, None) @@ -101,10 +93,23 @@ def lambda_handler(event, context): lambda_handler(lambda_event, lambda_context) - self.mock_metric_lambda_stats.distribution.assert_has_calls( - [call("test.metric", 100, timestamp=None, tags=ANY)] + self.mock_threadstats_flush_distributions.assert_has_calls( + [ + call( + [ + { + "metric": "test.metric", + "points": [[ANY, [100]]], + "type": "distribution", + "host": None, + "device": None, + "tags": ANY, + "interval": 10, + } + ] + ) + ] ) - self.mock_wrapper_lambda_stats.flush.assert_called() self.mock_extract_dd_trace_context.assert_called_with( lambda_event, lambda_context, extractor=None ) @@ -122,16 +127,11 @@ def lambda_handler(event, context): lambda_event = {} lambda_handler(lambda_event, get_mock_context()) - self.mock_metric_lambda_stats.distribution.assert_not_called() - self.mock_wrapper_lambda_stats.flush.assert_not_called() + self.mock_threadstats_flush_distributions.assert_not_called() del os.environ["DD_FLUSH_TO_LOG"] def test_datadog_lambda_wrapper_flush_in_thread(self): - # stop patchers so mock_threadstats_flush_distributions gets called - self.metric_lambda_stats_patcher.stop() - self.wrapper_lambda_stats_patcher.stop() - # force ThreadStats to flush in thread import datadog_lambda.metric as metric_module @@ -158,15 +158,7 @@ def lambda_handler(event, context): metric_module.lambda_stats.stop() metric_module.lambda_stats.start(flush_in_thread=False) - # resume patchers - self.metric_lambda_stats_patcher.start() - self.wrapper_lambda_stats_patcher.start() - def test_datadog_lambda_wrapper_not_flush_in_thread(self): - # stop patchers so mock_threadstats_flush_distributions gets called - self.metric_lambda_stats_patcher.stop() - self.wrapper_lambda_stats_patcher.stop() - # force ThreadStats to not flush in thread import datadog_lambda.metric as metric_module @@ -193,10 +185,6 @@ def lambda_handler(event, context): metric_module.lambda_stats.stop() metric_module.lambda_stats.start(flush_in_thread=False) - # resume patchers - self.metric_lambda_stats_patcher.start() - self.wrapper_lambda_stats_patcher.start() - def test_datadog_lambda_wrapper_inject_correlation_ids(self): os.environ["DD_LOGS_INJECTION"] = "True" @@ -445,5 +433,4 @@ def lambda_handler(event, context): lambda_handler_double_wrapped(lambda_event, get_mock_context()) self.mock_patch_all.assert_called_once() - self.mock_wrapper_lambda_stats.flush.assert_called_once() self.mock_submit_invocations_metric.assert_called_once()