Skip to content

Commit 1ef9945

Browse files
Fix issue with stats d method being called (#140)
* Fix issue with stats d method being called * Update integration tests * Feedback from PR review * Fix PR issues * Reformat metrics file
1 parent f66f367 commit 1ef9945

9 files changed

+105
-63
lines changed

datadog_lambda/metric.py

Lines changed: 81 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,20 @@
2121
logger = logging.getLogger(__name__)
2222

2323

24-
class StatsDWrapper:
24+
class StatsWriter:
25+
def distribution(self, metric_name, value, tags=[], timestamp=None):
26+
raise NotImplementedError()
27+
28+
def flush(self):
29+
raise NotImplementedError()
30+
31+
def stop(self):
32+
raise NotImplementedError()
33+
34+
35+
class StatsDWriter(StatsWriter):
2536
"""
26-
Wraps StatsD calls, to give an identical interface to ThreadStats
37+
Writes distribution metrics using StatsD protocol
2738
"""
2839

2940
def __init__(self):
@@ -33,21 +44,82 @@ def __init__(self):
3344
def distribution(self, metric_name, value, tags=[], timestamp=None):
3445
statsd.distribution(metric_name, value, tags=tags)
3546

36-
def flush(self, value):
47+
def flush(self):
48+
pass
49+
50+
def stop(self):
3751
pass
3852

3953

54+
class ThreadStatsWriter(StatsWriter):
55+
"""
56+
Writes distribution metrics using the ThreadStats class
57+
"""
58+
59+
def __init__(self, flush_in_thread):
60+
self.thread_stats = ThreadStats(compress_payload=True)
61+
self.thread_stats.start(flush_in_thread=flush_in_thread)
62+
63+
def distribution(self, metric_name, value, tags=[], timestamp=None):
64+
self.thread_stats.distribution(
65+
metric_name, value, tags=tags, timestamp=timestamp
66+
)
67+
68+
def flush(self):
69+
""" "Flush distributions from ThreadStats to Datadog.
70+
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
71+
to gain better control over exception handling.
72+
"""
73+
_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf"))
74+
count_dists = len(dists)
75+
if not count_dists:
76+
logger.debug("No distributions to flush. Continuing.")
77+
78+
self.thread_stats.flush_count += 1
79+
logger.debug(
80+
"Flush #%s sending %s distributions",
81+
self.thread_stats.flush_count,
82+
count_dists,
83+
)
84+
try:
85+
self.thread_stats.reporter.flush_distributions(dists)
86+
except Exception as e:
87+
# The nature of the root issue https://bugs.python.org/issue41345 is complex,
88+
# but comprehensive tests suggest that it is safe to retry on this specific error.
89+
if isinstance(
90+
e, api.exceptions.ClientError
91+
) and "RemoteDisconnected" in str(e):
92+
logger.debug(
93+
"Retry flush #%s due to RemoteDisconnected",
94+
self.thread_stats.flush_count,
95+
)
96+
try:
97+
self.thread_stats.reporter.flush_distributions(dists)
98+
except Exception:
99+
logger.debug(
100+
"Flush #%s failed after retry",
101+
self.thread_stats.flush_count,
102+
exc_info=True,
103+
)
104+
else:
105+
logger.debug(
106+
"Flush #%s failed", self.thread_stats.flush_count, exc_info=True
107+
)
108+
109+
def stop(self):
110+
self.thread_stats.stop()
111+
112+
40113
lambda_stats = None
41114
if should_use_extension:
42-
lambda_stats = StatsDWrapper()
115+
lambda_stats = StatsDWriter()
43116
else:
44117
# Periodical flushing in a background thread is NOT guaranteed to succeed
45118
# and leads to data loss. When disabled, metrics are only flushed at the
46119
# end of invocation. To make metrics submitted from a long-running Lambda
47120
# function available sooner, consider using the Datadog Lambda extension.
48121
flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true"
49-
lambda_stats = ThreadStats(compress_payload=True)
50-
lambda_stats.start(flush_in_thread=flush_in_thread)
122+
lambda_stats = ThreadStatsWriter(flush_in_thread)
51123

52124

53125
def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=False):
@@ -74,8 +146,7 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
74146

75147

76148
def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
77-
"""Writes the specified metric point to standard output
78-
"""
149+
"""Writes the specified metric point to standard output"""
79150
logger.debug(
80151
"Sending metric %s value %s to Datadog via log forwarder", metric_name, value
81152
)
@@ -91,40 +162,8 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
91162
)
92163

93164

94-
def flush_thread_stats():
95-
""""Flush distributions from ThreadStats to Datadog.
96-
97-
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
98-
to gain better control over exception handling.
99-
"""
100-
_, dists = lambda_stats._get_aggregate_metrics_and_dists(float("inf"))
101-
count_dists = len(dists)
102-
if not count_dists:
103-
logger.debug("No distributions to flush. Continuing.")
104-
105-
lambda_stats.flush_count += 1
106-
logger.debug(
107-
"Flush #%s sending %s distributions", lambda_stats.flush_count, count_dists
108-
)
109-
try:
110-
lambda_stats.reporter.flush_distributions(dists)
111-
except Exception as e:
112-
# The nature of the root issue https://bugs.python.org/issue41345 is complex,
113-
# but comprehensive tests suggest that it is safe to retry on this specific error.
114-
if isinstance(e, api.exceptions.ClientError) and "RemoteDisconnected" in str(e):
115-
logger.debug(
116-
"Retry flush #%s due to RemoteDisconnected", lambda_stats.flush_count
117-
)
118-
try:
119-
lambda_stats.reporter.flush_distributions(dists)
120-
except Exception:
121-
logger.debug(
122-
"Flush #%s failed after retry",
123-
lambda_stats.flush_count,
124-
exc_info=True,
125-
)
126-
else:
127-
logger.debug("Flush #%s failed", lambda_stats.flush_count, exc_info=True)
165+
def flush_stats():
166+
lambda_stats.flush()
128167

129168

130169
def are_enhanced_metrics_enabled():

datadog_lambda/wrapper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from datadog_lambda.cold_start import set_cold_start, is_cold_start
1313
from datadog_lambda.constants import XraySubsegment, TraceContextSource
1414
from datadog_lambda.metric import (
15-
flush_thread_stats,
15+
flush_stats,
1616
submit_invocations_metric,
1717
submit_errors_metric,
1818
)
@@ -177,7 +177,7 @@ def _after(self, event, context):
177177
)
178178

179179
if not self.flush_to_log or should_use_extension:
180-
flush_thread_stats()
180+
flush_stats()
181181
if should_use_extension:
182182
flush_extension()
183183

scripts/run_integration_tests.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ for handler_name in "${LAMBDA_HANDLERS[@]}"; do
197197
# Normalize package version so that these snapshots aren't broken on version bumps
198198
sed -E "s/(dd_lambda_layer:datadog-python[0-9]+_)[0-9]+\.[0-9]+\.[0-9]+/\1X\.X\.X/g" |
199199
sed -E "s/(datadog_lambda:v)([0-9]+\.[0-9]+\.[0-9])/\1XX/g" |
200+
sed -E "s/(python )([0-9]\.[0-9]+\.[0-9]+)/\1XX/g" |
200201
# Strip out run ID (from function name, resource, etc.)
201202
sed -E "s/${!run_id}/XXXX/g" |
202203
# Strip out trace/span/parent/timestamps

0 commit comments

Comments
 (0)