diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index e34808e4..e15ac6e5 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -1,6 +1,4 @@ Component,Origin,License,Copyright -aws-xray-sdk-python,github.com/aws/aws-xray-sdk-python,Apache-2.0, flake8,gitlab.com/pycqa/flake8,MIT,"Copyright (C) 2011-2013 Tarek Ziade . Copyright (C) 2012-2016 Ian Cordasco ." nose2,github.com/nose-devs/nose2,BSD-2-Clause,"Copyright (c) 2012, Jason Pellerin. All rights reserved." -requests,github.com/kennethreitz/requests,Apache-2.0,"Copyright 2018 Kenneth Reitz" wrapt,github.com/GrahamDumpleton/wrapt,BSD-2-Clause,"Copyright (c) 2013-2019, Graham Dumpleton" \ No newline at end of file diff --git a/datadog_lambda/constants.py b/datadog_lambda/constants.py index dcfd6088..d2c85378 100644 --- a/datadog_lambda/constants.py +++ b/datadog_lambda/constants.py @@ -34,3 +34,10 @@ class TraceContextSource(object): XRAY = "xray" EVENT = "event" DDTRACE = "ddtrace" + + +# X-Ray deamon +class XrayDaemon(object): + XRAY_TRACE_ID_HEADER_NAME = "_X_AMZN_TRACE_ID" + XRAY_DAEMON_ADDRESS = "AWS_XRAY_DAEMON_ADDRESS" + FUNCTION_NAME_HEADER_NAME = "AWS_LAMBDA_FUNCTION_NAME" diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index e14bdf6d..9870f756 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -7,13 +7,15 @@ import os import json -from aws_xray_sdk.core import xray_recorder -from aws_xray_sdk.core.lambda_launcher import LambdaContext from datadog_lambda.constants import ( SamplingPriority, TraceHeader, - XraySubsegment, TraceContextSource, + XrayDaemon, +) +from datadog_lambda.xray import ( + send_segment, + parse_xray_header, ) from ddtrace import tracer, patch from ddtrace import __version__ as ddtrace_version @@ -57,16 +59,24 @@ def _get_xray_trace_context(): if not is_lambda_context(): return None - xray_trace_entity = xray_recorder.get_trace_entity() # xray (sub)segment + xray_trace_entity = parse_xray_header( + os.environ.get(XrayDaemon.XRAY_TRACE_ID_HEADER_NAME, "") + ) + if xray_trace_entity is None: + return None trace_context = { - "trace-id": _convert_xray_trace_id(xray_trace_entity.trace_id), - "parent-id": _convert_xray_entity_id(xray_trace_entity.id), - "sampling-priority": _convert_xray_sampling(xray_trace_entity.sampled), + "trace-id": _convert_xray_trace_id(xray_trace_entity["trace_id"]), + "parent-id": _convert_xray_entity_id(xray_trace_entity["parent_id"]), + "sampling-priority": _convert_xray_sampling(xray_trace_entity["sampled"]), } logger.debug( "Converted trace context %s from X-Ray segment %s", trace_context, - (xray_trace_entity.trace_id, xray_trace_entity.id, xray_trace_entity.sampled), + ( + xray_trace_entity["trace_id"], + xray_trace_entity["parent_id"], + xray_trace_entity["sampled"], + ), ) return trace_context @@ -106,19 +116,7 @@ def create_dd_dummy_metadata_subsegment( tags into its metadata field, so the X-Ray trace can be converted to a Datadog trace in the Datadog backend with the correct context. """ - try: - xray_recorder.begin_subsegment(XraySubsegment.NAME) - subsegment = xray_recorder.current_subsegment() - subsegment.put_metadata( - subsegment_metadata_key, subsegment_metadata_value, XraySubsegment.NAMESPACE - ) - xray_recorder.end_subsegment() - except Exception as e: - logger.debug( - "failed to create dd dummy metadata subsegment with error %s", - e, - exc_info=True, - ) + send_segment(subsegment_metadata_key, subsegment_metadata_value) def extract_context_from_lambda_context(lambda_context): @@ -359,7 +357,7 @@ def is_lambda_context(): Return True if the X-Ray context is `LambdaContext`, rather than the regular `Context` (e.g., when testing lambda functions locally). """ - return type(xray_recorder.context) == LambdaContext + return os.environ.get(XrayDaemon.FUNCTION_NAME_HEADER_NAME, "") != "" def set_dd_trace_py_root(trace_context_source, merge_xray_traces): diff --git a/datadog_lambda/xray.py b/datadog_lambda/xray.py new file mode 100644 index 00000000..d4836fae --- /dev/null +++ b/datadog_lambda/xray.py @@ -0,0 +1,113 @@ +import os +import logging +import json +import binascii +import time +import socket + +from datadog_lambda.constants import XrayDaemon, XraySubsegment, TraceContextSource + +logger = logging.getLogger(__name__) + + +def get_xray_host_port(adress): + if adress == "": + logger.debug("X-Ray daemon env var not set, not sending sub-segment") + return None + parts = adress.split(":") + if len(parts) <= 1: + logger.debug("X-Ray daemon env var not set, not sending sub-segment") + return None + port = int(parts[1]) + host = parts[0] + return (host, port) + + +def send(host_port_tuple, payload): + sock = None + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setblocking(0) + sock.connect(host_port_tuple) + sock.send(payload.encode("utf-8")) + except Exception as e_send: + logger.error("Error occurred submitting to xray daemon: %s", str(e_send)) + try: + sock.close() + except Exception as e_close: + logger.error("Error while closing the socket: %s", str(e_close)) + + +def build_segment_payload(payload): + if payload is None: + return None + return '{"format": "json", "version": 1}' + "\n" + payload + + +def parse_xray_header(raw_trace_id): + # Example: Root=1-5e272390-8c398be037738dc042009320;Parent=94ae789b969f1cc5;Sampled=1 + logger.debug("Reading trace context from env var %s", raw_trace_id) + if len(raw_trace_id) == 0: + return None + parts = raw_trace_id.split(";") + if len(parts) != 3: + return None + root = parts[0].replace("Root=", "") + parent = parts[1].replace("Parent=", "") + sampled = parts[2].replace("Sampled=", "") + if ( + len(root) == len(parts[0]) + or len(parent) == len(parts[1]) + or len(sampled) == len(parts[2]) + ): + return None + return { + "parent_id": parent, + "trace_id": root, + "sampled": sampled, + "source": TraceContextSource.XRAY, + } + + +def generate_random_id(): + return binascii.b2a_hex(os.urandom(8)).decode("utf-8") + + +def build_segment(context, key, metadata): + + segment = json.dumps( + { + "id": generate_random_id(), + "trace_id": context["trace_id"], + "parent_id": context["parent_id"], + "name": XraySubsegment.NAME, + "start_time": time.time(), + "end_time": time.time(), + "type": "subsegment", + "metadata": { + XraySubsegment.NAMESPACE: { + key: metadata, + } + }, + } + ) + return segment + + +def send_segment(key, metadata): + host_port_tuple = get_xray_host_port( + os.environ.get(XrayDaemon.XRAY_DAEMON_ADDRESS, "") + ) + if host_port_tuple is None: + return None + context = parse_xray_header( + os.environ.get(XrayDaemon.XRAY_TRACE_ID_HEADER_NAME, "") + ) + if context is None: + logger.debug( + "Failed to create segment since it was not possible to get trace context from header" + ) + return None + segment = build_segment(context, key, metadata) + segment_payload = build_segment_payload(segment) + send(host_port_tuple, segment_payload) diff --git a/setup.py b/setup.py index c8332351..31e4f394 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,6 @@ packages=["datadog_lambda"], python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4", install_requires=[ - "aws-xray-sdk==2.8.0", "datadog==0.41.0", "ddtrace==0.48.0", "wrapt==1.11.2", diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 2a7e1581..82e11f3f 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -1,5 +1,6 @@ import unittest import json +import os try: from unittest.mock import MagicMock, patch, call @@ -22,6 +23,12 @@ function_arn = "arn:aws:lambda:us-west-1:123457598159:function:python-layer-test" +fake_xray_header_value = ( + "Root=1-5e272390-8c398be037738dc042009320;Parent=94ae789b969f1cc5;Sampled=1" +) +fake_xray_header_value_parent_decimal = "10713633173203262661" +fake_xray_header_value_root_decimal = "3995693151288333088" + class ClientContext(object): def __init__(self, custom=None): @@ -50,17 +57,10 @@ class TestExtractAndGetDDTraceContext(unittest.TestCase): def setUp(self): global dd_tracing_enabled dd_tracing_enabled = False - patcher = patch("datadog_lambda.tracing.xray_recorder") - self.mock_xray_recorder = patcher.start() - self.mock_xray_recorder.get_trace_entity.return_value = MagicMock( - id="ffff", trace_id="1111", sampled=True - ) - self.mock_current_subsegment = MagicMock() - self.mock_xray_recorder.current_subsegment.return_value = ( - self.mock_current_subsegment - ) + os.environ["_X_AMZN_TRACE_ID"] = fake_xray_header_value + patcher = patch("datadog_lambda.tracing.send_segment") + self.mock_send_segment = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("datadog_lambda.tracing.is_lambda_context") self.mock_is_lambda_context = patcher.start() self.mock_is_lambda_context.return_value = True @@ -69,6 +69,7 @@ def setUp(self): def tearDown(self): global dd_tracing_enabled dd_tracing_enabled = False + del os.environ["_X_AMZN_TRACE_ID"] def test_without_datadog_trace_headers(self): lambda_ctx = get_mock_context() @@ -76,13 +77,17 @@ def test_without_datadog_trace_headers(self): self.assertEqual(source, "xray") self.assertDictEqual( ctx, - {"trace-id": "4369", "parent-id": "65535", "sampling-priority": "2"}, + { + "trace-id": fake_xray_header_value_root_decimal, + "parent-id": fake_xray_header_value_parent_decimal, + "sampling-priority": "2", + }, ) self.assertDictEqual( get_dd_trace_context(), { - TraceHeader.TRACE_ID: "4369", - TraceHeader.PARENT_ID: "65535", + TraceHeader.TRACE_ID: fake_xray_header_value_root_decimal, + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "2", }, {}, @@ -97,13 +102,17 @@ def test_with_incomplete_datadog_trace_headers(self): self.assertEqual(source, "xray") self.assertDictEqual( ctx, - {"trace-id": "4369", "parent-id": "65535", "sampling-priority": "2"}, + { + "trace-id": fake_xray_header_value_root_decimal, + "parent-id": fake_xray_header_value_parent_decimal, + "sampling-priority": "2", + }, ) self.assertDictEqual( get_dd_trace_context(), { - TraceHeader.TRACE_ID: "4369", - TraceHeader.PARENT_ID: "65535", + TraceHeader.TRACE_ID: fake_xray_header_value_root_decimal, + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "2", }, ) @@ -129,16 +138,15 @@ def test_with_complete_datadog_trace_headers(self): get_dd_trace_context(), { TraceHeader.TRACE_ID: "123", - TraceHeader.PARENT_ID: "65535", + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "1", }, ) create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY) - self.mock_xray_recorder.begin_subsegment.assert_called() - self.mock_current_subsegment.put_metadata.assert_called_with( + self.mock_send_segment.assert_called() + self.mock_send_segment.assert_called_with( XraySubsegment.TRACE_KEY, {"trace-id": "123", "parent-id": "321", "sampling-priority": "1"}, - XraySubsegment.NAMESPACE, ) def test_with_extractor_function(self): @@ -176,7 +184,7 @@ def extractor_foo(event, context): get_dd_trace_context(), { TraceHeader.TRACE_ID: "123", - TraceHeader.PARENT_ID: "65535", + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "1", }, ) @@ -201,16 +209,16 @@ def extractor_raiser(event, context): self.assertDictEqual( ctx, { - "trace-id": "4369", - "parent-id": "65535", + "trace-id": fake_xray_header_value_root_decimal, + "parent-id": fake_xray_header_value_parent_decimal, "sampling-priority": "2", }, ) self.assertDictEqual( get_dd_trace_context(), { - TraceHeader.TRACE_ID: "4369", - TraceHeader.PARENT_ID: "65535", + TraceHeader.TRACE_ID: fake_xray_header_value_root_decimal, + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "2", }, ) @@ -261,17 +269,14 @@ def test_with_sqs_distributed_datadog_trace_data(self): get_dd_trace_context(), { TraceHeader.TRACE_ID: "123", - TraceHeader.PARENT_ID: "65535", + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "1", }, ) create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY) - self.mock_xray_recorder.begin_subsegment.assert_called() - self.mock_xray_recorder.end_subsegment.assert_called() - self.mock_current_subsegment.put_metadata.assert_called_with( + self.mock_send_segment.assert_called_with( XraySubsegment.TRACE_KEY, {"trace-id": "123", "parent-id": "321", "sampling-priority": "1"}, - XraySubsegment.NAMESPACE, ) def test_with_legacy_client_context_datadog_trace_data(self): @@ -298,17 +303,15 @@ def test_with_legacy_client_context_datadog_trace_data(self): get_dd_trace_context(), { TraceHeader.TRACE_ID: "666", - TraceHeader.PARENT_ID: "65535", + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "1", }, ) create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY) - self.mock_xray_recorder.begin_subsegment.assert_called() - self.mock_xray_recorder.end_subsegment.assert_called() - self.mock_current_subsegment.put_metadata.assert_called_with( + self.mock_send_segment.assert_called() + self.mock_send_segment.assert_called_with( XraySubsegment.TRACE_KEY, {"trace-id": "666", "parent-id": "777", "sampling-priority": "1"}, - XraySubsegment.NAMESPACE, ) def test_with_new_client_context_datadog_trace_data(self): @@ -333,17 +336,15 @@ def test_with_new_client_context_datadog_trace_data(self): get_dd_trace_context(), { TraceHeader.TRACE_ID: "666", - TraceHeader.PARENT_ID: "65535", + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "1", }, ) create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY) - self.mock_xray_recorder.begin_subsegment.assert_called() - self.mock_xray_recorder.end_subsegment.assert_called() - self.mock_current_subsegment.put_metadata.assert_called_with( + self.mock_send_segment.assert_called() + self.mock_send_segment.assert_called_with( XraySubsegment.TRACE_KEY, {"trace-id": "666", "parent-id": "777", "sampling-priority": "1"}, - XraySubsegment.NAMESPACE, ) def test_with_complete_datadog_trace_headers_with_mixed_casing(self): @@ -362,7 +363,7 @@ def test_with_complete_datadog_trace_headers_with_mixed_casing(self): get_dd_trace_context(), { TraceHeader.TRACE_ID: "123", - TraceHeader.PARENT_ID: "65535", + TraceHeader.PARENT_ID: fake_xray_header_value_parent_decimal, TraceHeader.SAMPLING_PRIORITY: "1", }, ) @@ -375,9 +376,8 @@ def test_with_complete_datadog_trace_headers_with_trigger_tags(self): create_dd_dummy_metadata_subsegment( trigger_tags, XraySubsegment.LAMBDA_FUNCTION_TAGS_KEY ) - self.mock_xray_recorder.begin_subsegment.assert_called() - self.mock_xray_recorder.end_subsegment.assert_called() - self.mock_current_subsegment.put_metadata.assert_has_calls( + self.mock_send_segment.assert_called() + self.mock_send_segment.assert_has_calls( [ call( XraySubsegment.LAMBDA_FUNCTION_TAGS_KEY, @@ -385,7 +385,6 @@ def test_with_complete_datadog_trace_headers_with_trigger_tags(self): "function_trigger.event_source": "sqs", "function_trigger.event_source_arn": "arn:aws:sqs:us-east-1:123456789012:MyQueue", }, - XraySubsegment.NAMESPACE, ), ] ) diff --git a/tests/test_xray.py b/tests/test_xray.py new file mode 100644 index 00000000..ba22928d --- /dev/null +++ b/tests/test_xray.py @@ -0,0 +1,57 @@ +import os +import unittest +import json + +try: + from unittest.mock import MagicMock, patch, call +except ImportError: + from mock import MagicMock, patch, call + +from datadog_lambda.xray import get_xray_host_port, build_segment_payload, build_segment + + +class TestXRay(unittest.TestCase): + def test_get_xray_host_port_empty_(self): + result = get_xray_host_port("") + self.assertIsNone(result) + + def test_get_xray_host_port_invalid_value(self): + result = get_xray_host_port("myVar") + self.assertIsNone(result) + + def test_get_xray_host_port_success(self): + result = get_xray_host_port("mySuperHost:1000") + self.assertEqual("mySuperHost", result[0]) + self.assertEqual(1000, result[1]) + + def test_build_segment_payload_ok(self): + exected_text = '{"format": "json", "version": 1}\nmyPayload' + self.assertEqual(exected_text, build_segment_payload("myPayload")) + + def test_build_segment_payload_no_payload(self): + self.assertIsNone(build_segment_payload(None)) + + @patch("time.time", MagicMock(return_value=1111)) + @patch( + "datadog_lambda.xray.generate_random_id", + MagicMock(return_value="1234abcd"), + ) + def test_build_segment(self): + context = { + "trace_id": 111000111, + "parent_id": 222000222, + } + + value = json.dumps({"a": "aaa", "b": "bbb"}) + result = build_segment(context, "myKey", "myValue") + jsonResult = json.loads(result) + metadataJson = jsonResult["metadata"] + + self.assertEqual("1234abcd", jsonResult["id"]) + self.assertEqual(1111, jsonResult["start_time"]) + self.assertEqual(1111, jsonResult["end_time"]) + self.assertEqual(111000111, jsonResult["trace_id"]) + self.assertEqual(222000222, jsonResult["parent_id"]) + self.assertEqual("datadog-metadata", jsonResult["name"]) + self.assertEqual("subsegment", jsonResult["type"]) + self.assertEqual("myValue", metadataJson["datadog"]["myKey"])