Skip to content

Retry flush metrics from ThreadStats to Datadog over RemoteDisconnected errors. #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions datadog_lambda/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,42 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
)


def flush_thread_stats():
""""Flush distributions from ThreadStats to Datadog.

Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
to gain better control over exception handling.
"""
_, dists = lambda_stats._get_aggregate_metrics_and_dists(float("inf"))
count_dists = len(dists)
if not count_dists:
logger.debug("No distributions to flush. Continuing.")

lambda_stats.flush_count += 1
logger.debug(
"Flush #%s sending %s distributions", lambda_stats.flush_count, count_dists
)
try:
lambda_stats.reporter.flush_distributions(dists)
except Exception as e:
# The nature of the root issue https://bugs.python.org/issue41345 is complex,
# but comprehensive tests suggest that it is safe to retry on this specific error.
if isinstance(e, api.exceptions.ClientError) and "RemoteDisconnected" in str(e):
logger.debug(
"Retry flush #%s due to RemoteDisconnected", lambda_stats.flush_count
)
try:
lambda_stats.reporter.flush_distributions(dists)
except Exception:
logger.debug(
"Flush #%s failed after retry",
lambda_stats.flush_count,
exc_info=True,
)
else:
logger.debug("Flush #%s failed", lambda_stats.flush_count, exc_info=True)


def are_enhanced_metrics_enabled():
"""Check env var to find if enhanced metrics should be submitted

Expand Down Expand Up @@ -165,3 +201,6 @@ def submit_errors_metric(lambda_context):
"DATADOG_HOST", "https://api." + os.environ.get("DD_SITE", "datadoghq.com")
)
logger.debug("Setting DATADOG_HOST to %s", api._api_host)

# Unmute exceptions from datadog api client, so we can catch and handle them
api._mute = False
4 changes: 2 additions & 2 deletions datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from datadog_lambda.cold_start import set_cold_start, is_cold_start
from datadog_lambda.constants import XraySubsegment, TraceContextSource
from datadog_lambda.metric import (
lambda_stats,
flush_thread_stats,
submit_invocations_metric,
submit_errors_metric,
)
Expand Down Expand Up @@ -177,7 +177,7 @@ def _after(self, event, context):
)

if not self.flush_to_log or should_use_extension:
lambda_stats.flush(float("inf"))
flush_thread_stats()
if should_use_extension:
flush_extension()

Expand Down
22 changes: 21 additions & 1 deletion tests/test_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
except ImportError:
from mock import patch, call

from datadog_lambda.metric import lambda_metric
from datadog.api.exceptions import ClientError
from datadog_lambda.metric import lambda_metric, flush_thread_stats
from datadog_lambda.tags import _format_dd_lambda_layer_tag


Expand Down Expand Up @@ -36,3 +37,22 @@ def test_lambda_metric_flush_to_log(self):
self.mock_metric_lambda_stats.distribution.assert_not_called()

del os.environ["DD_FLUSH_TO_LOG"]


class TestFlushThreadStats(unittest.TestCase):
def setUp(self):
patcher = patch(
"datadog.threadstats.reporters.HttpReporter.flush_distributions"
)
self.mock_threadstats_flush_distributions = patcher.start()
self.addCleanup(patcher.stop)

def test_retry_on_remote_disconnected(self):
# Raise the RemoteDisconnected error
self.mock_threadstats_flush_distributions.side_effect = ClientError(
"POST",
"https://api.datadoghq.com/api/v1/distribution_points",
"RemoteDisconnected('Remote end closed connection without response')",
)
flush_thread_stats()
self.assertEqual(self.mock_threadstats_flush_distributions.call_count, 2)
47 changes: 17 additions & 30 deletions tests/test_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,6 @@ def setUp(self):
self.mock_threadstats_flush_distributions = patcher.start()
self.addCleanup(patcher.stop)

self.metric_lambda_stats_patcher = patch("datadog_lambda.metric.lambda_stats")
self.mock_metric_lambda_stats = self.metric_lambda_stats_patcher.start()
self.addCleanup(self.metric_lambda_stats_patcher.stop)

self.wrapper_lambda_stats_patcher = patch("datadog_lambda.wrapper.lambda_stats")
self.mock_wrapper_lambda_stats = self.wrapper_lambda_stats_patcher.start()
self.addCleanup(self.wrapper_lambda_stats_patcher.stop)

patcher = patch("datadog_lambda.wrapper.extract_dd_trace_context")
self.mock_extract_dd_trace_context = patcher.start()
self.mock_extract_dd_trace_context.return_value = ({}, None)
Expand Down Expand Up @@ -101,10 +93,23 @@ def lambda_handler(event, context):

lambda_handler(lambda_event, lambda_context)

self.mock_metric_lambda_stats.distribution.assert_has_calls(
[call("test.metric", 100, timestamp=None, tags=ANY)]
self.mock_threadstats_flush_distributions.assert_has_calls(
[
call(
[
{
"metric": "test.metric",
"points": [[ANY, [100]]],
"type": "distribution",
"host": None,
"device": None,
"tags": ANY,
"interval": 10,
}
]
)
]
)
self.mock_wrapper_lambda_stats.flush.assert_called()
self.mock_extract_dd_trace_context.assert_called_with(
lambda_event, lambda_context, extractor=None
)
Expand All @@ -122,16 +127,11 @@ def lambda_handler(event, context):
lambda_event = {}
lambda_handler(lambda_event, get_mock_context())

self.mock_metric_lambda_stats.distribution.assert_not_called()
self.mock_wrapper_lambda_stats.flush.assert_not_called()
self.mock_threadstats_flush_distributions.assert_not_called()

del os.environ["DD_FLUSH_TO_LOG"]

def test_datadog_lambda_wrapper_flush_in_thread(self):
# stop patchers so mock_threadstats_flush_distributions gets called
self.metric_lambda_stats_patcher.stop()
self.wrapper_lambda_stats_patcher.stop()

# force ThreadStats to flush in thread
import datadog_lambda.metric as metric_module

Expand All @@ -158,15 +158,7 @@ def lambda_handler(event, context):
metric_module.lambda_stats.stop()
metric_module.lambda_stats.start(flush_in_thread=False)

# resume patchers
self.metric_lambda_stats_patcher.start()
self.wrapper_lambda_stats_patcher.start()

def test_datadog_lambda_wrapper_not_flush_in_thread(self):
# stop patchers so mock_threadstats_flush_distributions gets called
self.metric_lambda_stats_patcher.stop()
self.wrapper_lambda_stats_patcher.stop()

# force ThreadStats to not flush in thread
import datadog_lambda.metric as metric_module

Expand All @@ -193,10 +185,6 @@ def lambda_handler(event, context):
metric_module.lambda_stats.stop()
metric_module.lambda_stats.start(flush_in_thread=False)

# resume patchers
self.metric_lambda_stats_patcher.start()
self.wrapper_lambda_stats_patcher.start()

def test_datadog_lambda_wrapper_inject_correlation_ids(self):
os.environ["DD_LOGS_INJECTION"] = "True"

Expand Down Expand Up @@ -445,5 +433,4 @@ def lambda_handler(event, context):
lambda_handler_double_wrapped(lambda_event, get_mock_context())

self.mock_patch_all.assert_called_once()
self.mock_wrapper_lambda_stats.flush.assert_called_once()
self.mock_submit_invocations_metric.assert_called_once()