Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions ddtrace/contrib/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,41 @@ class TraceInjectionDecodingError(Exception):
pass


def inject_trace_data_to_message_attributes(trace_data, entry):
# type: (Dict[str, str], Dict[str, Any]) -> None
def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None):
# type: (Dict[str, str], Dict[str, Any], Optional[str]) -> None
"""
:trace_data: trace headers to be stored in the entry's MessageAttributes
:entry: an SQS or SNS record
:endpoint: endpoint of message, "sqs" or "sns"

Inject trace headers into the an SQS or SNS record's MessageAttributes
"""
if "MessageAttributes" not in entry:
entry["MessageAttributes"] = {}
# An Amazon SQS message can contain up to 10 metadata attributes.
# Max of 10 message attributes.
if len(entry["MessageAttributes"]) < 10:
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
if endpoint == "sqs":
# Use String since changing this to Binary would be a breaking
# change as other tracers expect this to be a String.
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
elif endpoint == "sns":
# Use Binary since SNS subscription filter policies fail silently
# with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269
# AWS will encode our value if it sees "Binary"
entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)}
else:
log.warning("skipping trace injection, endpoint is not SNS or SQS")
else:
# In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute
log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded")


def inject_trace_to_sqs_or_sns_batch_message(params, span):
# type: (Any, Span) -> None
def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None):
# type: (Any, Span, Optional[str]) -> None
"""
:params: contains the params for the current botocore action
:span: the span which provides the trace context to be propagated
:endpoint: endpoint of message, "sqs" or "sns"

Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch
"""
Expand All @@ -94,21 +107,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span):
# or PublishBatchRequestEntries (in case of PublishBatch).
entries = params.get("Entries", params.get("PublishBatchRequestEntries", []))
for entry in entries:
inject_trace_data_to_message_attributes(trace_data, entry)
inject_trace_data_to_message_attributes(trace_data, entry, endpoint)


def inject_trace_to_sqs_or_sns_message(params, span):
# type: (Any, Span) -> None
def inject_trace_to_sqs_or_sns_message(params, span, endpoint=None):
# type: (Any, Span, Optional[str]) -> None
"""
:params: contains the params for the current botocore action
:span: the span which provides the trace context to be propagated
:endpoint: endpoint of message, "sqs" or "sns"

Inject trace headers into MessageAttributes for the SQS or SNS record
"""
trace_data = {}
HTTPPropagator.inject(span.context, trace_data)

inject_trace_data_to_message_attributes(trace_data, params)
inject_trace_data_to_message_attributes(trace_data, params, endpoint)


def inject_trace_to_eventbridge_detail(params, span):
Expand Down Expand Up @@ -293,17 +307,17 @@ def patched_api_call(original_func, instance, args, kwargs):
if endpoint_name == "lambda" and operation == "Invoke":
inject_trace_to_client_context(params, span)
if endpoint_name == "sqs" and operation == "SendMessage":
inject_trace_to_sqs_or_sns_message(params, span)
inject_trace_to_sqs_or_sns_message(params, span, endpoint_name)
if endpoint_name == "sqs" and operation == "SendMessageBatch":
inject_trace_to_sqs_or_sns_batch_message(params, span)
inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name)
if endpoint_name == "events" and operation == "PutEvents":
inject_trace_to_eventbridge_detail(params, span)
if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"):
inject_trace_to_kinesis_stream(params, span)
if endpoint_name == "sns" and operation == "Publish":
inject_trace_to_sqs_or_sns_message(params, span)
inject_trace_to_sqs_or_sns_message(params, span, endpoint_name)
if endpoint_name == "sns" and operation == "PublishBatch":
inject_trace_to_sqs_or_sns_batch_message(params, span)
inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name)
except Exception:
log.warning("Unable to inject trace context", exc_info=True)

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ services:
- ./.ddriot:/root/project/.riot

localstack:
image: localstack/localstack:0.13.1
image: localstack/localstack:0.14.1
network_mode: bridge
ports:
- "127.0.0.1:4566:4566"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232 <https://github.com/DataDog/serverless-plugin-datadog/issues/232>`_
8 changes: 6 additions & 2 deletions tests/contrib/botocore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,9 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self):
assert msg_str == "test"
msg_attr = msg_body["MessageAttributes"]
assert msg_attr.get("_datadog") is not None
headers = json.loads(msg_attr["_datadog"]["Value"])
assert msg_attr["_datadog"]["Type"] == "Binary"
datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"])
headers = json.loads(datadog_value_decoded.decode())
assert headers is not None
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)
Expand Down Expand Up @@ -1068,7 +1070,9 @@ def test_sns_send_message_trace_injection_with_message_attributes(self):
assert msg_str == "test"
msg_attr = msg_body["MessageAttributes"]
assert msg_attr.get("_datadog") is not None
headers = json.loads(msg_attr["_datadog"]["Value"])
assert msg_attr["_datadog"]["Type"] == "Binary"
datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"])
headers = json.loads(datadog_value_decoded.decode())
assert headers is not None
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)
Expand Down