diff --git a/datadog_lambda/api.py b/datadog_lambda/api.py index 079f69da..b5414fd9 100644 --- a/datadog_lambda/api.py +++ b/datadog_lambda/api.py @@ -1,7 +1,6 @@ import os import logging import base64 -from datadog_lambda.extension import should_use_extension logger = logging.getLogger(__name__) KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName" @@ -48,13 +47,10 @@ def decrypt_kms_api_key(kms_client, ciphertext): def init_api(): - if ( - not should_use_extension - and not os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true" - ): + if not os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true": # Make sure that this package would always be lazy-loaded/outside from the critical path # since underlying packages are quite heavy to load - # and useless when the extension is present + # and useless with the extension unless sending metrics with timestamps from datadog import api if not api._api_key: diff --git a/datadog_lambda/metric.py b/datadog_lambda/metric.py index e3b01a90..1cf9d4c2 100644 --- a/datadog_lambda/metric.py +++ b/datadog_lambda/metric.py @@ -10,13 +10,13 @@ from datadog_lambda.extension import should_use_extension from datadog_lambda.tags import get_enhanced_metrics_tags, dd_lambda_layer_tag -from datadog_lambda.api import init_api logger = logging.getLogger(__name__) lambda_stats = None +extension_thread_stats = None -init_api() +flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" if should_use_extension: from datadog_lambda.statsd_writer import StatsDWriter @@ -28,8 +28,9 @@ # end of invocation. To make metrics submitted from a long-running Lambda # function available sooner, consider using the Datadog Lambda extension. from datadog_lambda.thread_stats_writer import ThreadStatsWriter + from datadog_lambda.api import init_api - flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true" + init_api() lambda_stats = ThreadStatsWriter(flush_in_thread) enhanced_metrics_enabled = ( @@ -57,6 +58,22 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal tags = [] if tags is None else list(tags) tags.append(dd_lambda_layer_tag) + if should_use_extension and timestamp is not None: + # The extension does not support timestamps for distributions so we create a + # a thread stats writer to submit metrics with timestamps to the API + global extension_thread_stats + if extension_thread_stats is None: + from datadog_lambda.thread_stats_writer import ThreadStatsWriter + from datadog_lambda.api import init_api + + init_api() + extension_thread_stats = ThreadStatsWriter(flush_in_thread) + + extension_thread_stats.distribution( + metric_name, value, tags=tags, timestamp=timestamp + ) + return + if should_use_extension: logger.debug( "Sending metric %s value %s to Datadog via extension", metric_name, value @@ -94,6 +111,9 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]): def flush_stats(): lambda_stats.flush() + if extension_thread_stats is not None: + extension_thread_stats.flush() + def submit_enhanced_metric(metric_name, lambda_context): """Submits the enhanced metric with the given name diff --git a/tests/test_metric.py b/tests/test_metric.py index 992084b9..f07a4c6a 100644 --- a/tests/test_metric.py +++ b/tests/test_metric.py @@ -43,6 +43,18 @@ def test_lambda_metric_flush_to_log_with_extension(self): ) del os.environ["DD_FLUSH_TO_LOG"] + @patch("datadog_lambda.metric.should_use_extension", True) + def test_lambda_metric_timestamp_with_extension(self): + patcher = patch("datadog_lambda.metric.extension_thread_stats") + self.mock_metric_extension_thread_stats = patcher.start() + self.addCleanup(patcher.stop) + + lambda_metric("test_timestamp", 1, 123) + self.mock_metric_lambda_stats.distribution.assert_not_called() + self.mock_metric_extension_thread_stats.distribution.assert_called_with( + "test_timestamp", 1, timestamp=123, tags=[dd_lambda_layer_tag] + ) + def test_lambda_metric_flush_to_log(self): os.environ["DD_FLUSH_TO_LOG"] = "True"