Skip to content

Commit dae7a88

Browse files
[SVLS-4999] Add Lambda tags to metrics sent via the API (#501)
* add function tags to historical dist metrics * fix * add timestamp ceiling * snake_case * exclude alias when creating arn tag
1 parent dc964db commit dae7a88

File tree

4 files changed

+54
-7
lines changed

4 files changed

+54
-7
lines changed

datadog_lambda/metric.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import time
88
import logging
99
import ujson as json
10+
from datetime import datetime, timedelta
1011

1112
from datadog_lambda.extension import should_use_extension
1213
from datadog_lambda.tags import get_enhanced_metrics_tags, dd_lambda_layer_tag
@@ -61,6 +62,16 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
6162
if should_use_extension and timestamp is not None:
6263
# The extension does not support timestamps for distributions so we create a
6364
# a thread stats writer to submit metrics with timestamps to the API
65+
timestamp_ceiling = int(
66+
(datetime.now() - timedelta(hours=4)).timestamp()
67+
) # 4 hours ago
68+
if timestamp_ceiling > timestamp:
69+
logger.warning(
70+
"Timestamp %s is older than 4 hours, not submitting metric %s",
71+
timestamp,
72+
metric_name,
73+
)
74+
return
6475
global extension_thread_stats
6576
if extension_thread_stats is None:
6677
from datadog_lambda.thread_stats_writer import ThreadStatsWriter
@@ -108,11 +119,19 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
108119
)
109120

110121

111-
def flush_stats():
122+
def flush_stats(lambda_context=None):
112123
lambda_stats.flush()
113124

114125
if extension_thread_stats is not None:
115-
extension_thread_stats.flush()
126+
if lambda_context is not None:
127+
tags = get_enhanced_metrics_tags(lambda_context)
128+
split_arn = lambda_context.invoked_function_arn.split(":")
129+
if len(split_arn) > 7:
130+
# Get rid of the alias
131+
split_arn.pop()
132+
arn = ":".join(split_arn)
133+
tags.append("function_arn:" + arn)
134+
extension_thread_stats.flush(tags)
116135

117136

118137
def submit_enhanced_metric(metric_name, lambda_context):

datadog_lambda/thread_stats_writer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ def distribution(self, metric_name, value, tags=[], timestamp=None):
2222
metric_name, value, tags=tags, timestamp=timestamp
2323
)
2424

25-
def flush(self):
25+
def flush(self, tags=None):
2626
""" "Flush distributions from ThreadStats to Datadog.
2727
Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
2828
to gain better control over exception handling.
2929
"""
30+
if tags:
31+
self.thread_stats.constant_tags = self.thread_stats.constant_tags + tags
3032
_, dists = self.thread_stats._get_aggregate_metrics_and_dists(float("inf"))
3133
count_dists = len(dists)
3234
if not count_dists:

datadog_lambda/wrapper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ def _after(self, event, context):
374374
logger.debug("Failed to create cold start spans. %s", e)
375375

376376
if not self.flush_to_log or should_use_extension:
377-
flush_stats()
377+
flush_stats(context)
378378
if should_use_extension and self.local_testing_mode:
379379
# when testing locally, the extension does not know when an
380380
# invocation completes because it does not have access to the

tests/test_metric.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from botocore.exceptions import ClientError as BotocoreClientError
77
from datadog.api.exceptions import ClientError
8-
8+
from datetime import datetime, timedelta
99

1010
from datadog_lambda.metric import lambda_metric
1111
from datadog_lambda.api import decrypt_kms_api_key, KMS_ENCRYPTION_CONTEXT_KEY
@@ -49,12 +49,28 @@ def test_lambda_metric_timestamp_with_extension(self):
4949
self.mock_metric_extension_thread_stats = patcher.start()
5050
self.addCleanup(patcher.stop)
5151

52-
lambda_metric("test_timestamp", 1, 123)
52+
delta = timedelta(minutes=1)
53+
timestamp = int((datetime.now() - delta).timestamp())
54+
55+
lambda_metric("test_timestamp", 1, timestamp)
5356
self.mock_metric_lambda_stats.distribution.assert_not_called()
5457
self.mock_metric_extension_thread_stats.distribution.assert_called_with(
55-
"test_timestamp", 1, timestamp=123, tags=[dd_lambda_layer_tag]
58+
"test_timestamp", 1, timestamp=timestamp, tags=[dd_lambda_layer_tag]
5659
)
5760

61+
@patch("datadog_lambda.metric.should_use_extension", True)
62+
def test_lambda_metric_invalid_timestamp_with_extension(self):
63+
patcher = patch("datadog_lambda.metric.extension_thread_stats")
64+
self.mock_metric_extension_thread_stats = patcher.start()
65+
self.addCleanup(patcher.stop)
66+
67+
delta = timedelta(hours=5)
68+
timestamp = int((datetime.now() - delta).timestamp())
69+
70+
lambda_metric("test_timestamp", 1, timestamp)
71+
self.mock_metric_lambda_stats.distribution.assert_not_called()
72+
self.mock_metric_extension_thread_stats.distribution.assert_not_called()
73+
5874
def test_lambda_metric_flush_to_log(self):
5975
os.environ["DD_FLUSH_TO_LOG"] = "True"
6076

@@ -84,6 +100,16 @@ def test_retry_on_remote_disconnected(self):
84100
lambda_stats.flush()
85101
self.assertEqual(self.mock_threadstats_flush_distributions.call_count, 2)
86102

103+
def test_flush_stats_with_tags(self):
104+
lambda_stats = ThreadStatsWriter(True)
105+
tags = ["tag1:value1", "tag2:value2"]
106+
lambda_stats.flush(tags)
107+
self.mock_threadstats_flush_distributions.assert_called_once_with(
108+
lambda_stats.thread_stats._get_aggregate_metrics_and_dists(float("inf"))[1]
109+
)
110+
for tag in tags:
111+
self.assertTrue(tag in lambda_stats.thread_stats.constant_tags)
112+
87113

88114
MOCK_FUNCTION_NAME = "myFunction"
89115

0 commit comments

Comments
 (0)