Skip to content

Commit 205333c

Browse files
zARODz11zmergify-bot
authored and
mergify-bot
committed
fix(tracing/botocore): fix incorrect context propagation type for SNS (#3404)
Making sure msg attributes are encoded by the tracer (by changing the Type) allows our lambda library to decode the message attribute and create inferred spans for sns and sqs. Checkout this dd-lambda-python [PR](DataDog/datadog-lambda-python#211) for more context. ## Checklist - [ ] Added to the correct milestone. - [ ] Tests provided or description of manual testing performed is included in the code or PR. - [ ] Library documentation is updated. - [ ] [Corp site](https://github.com/DataDog/documentation/) documentation is updated (link to the PR). (cherry picked from commit bd8e176)
1 parent 5df7b7c commit 205333c

File tree

4 files changed

+39
-17
lines changed

4 files changed

+39
-17
lines changed

ddtrace/contrib/botocore/patch.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,28 +61,41 @@ class TraceInjectionDecodingError(Exception):
6161
pass
6262

6363

64-
def inject_trace_data_to_message_attributes(trace_data, entry):
65-
# type: (Dict[str, str], Dict[str, Any]) -> None
64+
def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None):
65+
# type: (Dict[str, str], Dict[str, Any], Optional[str]) -> None
6666
"""
6767
:trace_data: trace headers to be stored in the entry's MessageAttributes
6868
:entry: an SQS or SNS record
69+
:endpoint: endpoint of message, "sqs" or "sns"
6970
7071
Inject trace headers into the an SQS or SNS record's MessageAttributes
7172
"""
7273
if "MessageAttributes" not in entry:
7374
entry["MessageAttributes"] = {}
74-
# An Amazon SQS message can contain up to 10 metadata attributes.
75+
# Max of 10 message attributes.
7576
if len(entry["MessageAttributes"]) < 10:
76-
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
77+
if endpoint == "sqs":
78+
# Use String since changing this to Binary would be a breaking
79+
# change as other tracers expect this to be a String.
80+
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
81+
elif endpoint == "sns":
82+
# Use Binary since SNS subscription filter policies fail silently
83+
# with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269
84+
# AWS will encode our value if it sees "Binary"
85+
entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)}
86+
else:
87+
log.warning("skipping trace injection, endpoint is not SNS or SQS")
7788
else:
89+
# In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute
7890
log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded")
7991

8092

81-
def inject_trace_to_sqs_or_sns_batch_message(params, span):
82-
# type: (Any, Span) -> None
93+
def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None):
94+
# type: (Any, Span, Optional[str]) -> None
8395
"""
8496
:params: contains the params for the current botocore action
8597
:span: the span which provides the trace context to be propagated
98+
:endpoint: endpoint of message, "sqs" or "sns"
8699
87100
Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch
88101
"""
@@ -94,21 +107,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span):
94107
# or PublishBatchRequestEntries (in case of PublishBatch).
95108
entries = params.get("Entries", params.get("PublishBatchRequestEntries", []))
96109
for entry in entries:
97-
inject_trace_data_to_message_attributes(trace_data, entry)
110+
inject_trace_data_to_message_attributes(trace_data, entry, endpoint)
98111

99112

100-
def inject_trace_to_sqs_or_sns_message(params, span):
101-
# type: (Any, Span) -> None
113+
def inject_trace_to_sqs_or_sns_message(params, span, endpoint=None):
114+
# type: (Any, Span, Optional[str]) -> None
102115
"""
103116
:params: contains the params for the current botocore action
104117
:span: the span which provides the trace context to be propagated
118+
:endpoint: endpoint of message, "sqs" or "sns"
105119
106120
Inject trace headers into MessageAttributes for the SQS or SNS record
107121
"""
108122
trace_data = {}
109123
HTTPPropagator.inject(span.context, trace_data)
110124

111-
inject_trace_data_to_message_attributes(trace_data, params)
125+
inject_trace_data_to_message_attributes(trace_data, params, endpoint)
112126

113127

114128
def inject_trace_to_eventbridge_detail(params, span):
@@ -293,17 +307,17 @@ def patched_api_call(original_func, instance, args, kwargs):
293307
if endpoint_name == "lambda" and operation == "Invoke":
294308
inject_trace_to_client_context(params, span)
295309
if endpoint_name == "sqs" and operation == "SendMessage":
296-
inject_trace_to_sqs_or_sns_message(params, span)
310+
inject_trace_to_sqs_or_sns_message(params, span, endpoint_name)
297311
if endpoint_name == "sqs" and operation == "SendMessageBatch":
298-
inject_trace_to_sqs_or_sns_batch_message(params, span)
312+
inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name)
299313
if endpoint_name == "events" and operation == "PutEvents":
300314
inject_trace_to_eventbridge_detail(params, span)
301315
if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"):
302316
inject_trace_to_kinesis_stream(params, span)
303317
if endpoint_name == "sns" and operation == "Publish":
304-
inject_trace_to_sqs_or_sns_message(params, span)
318+
inject_trace_to_sqs_or_sns_message(params, span, endpoint_name)
305319
if endpoint_name == "sns" and operation == "PublishBatch":
306-
inject_trace_to_sqs_or_sns_batch_message(params, span)
320+
inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name)
307321
except Exception:
308322
log.warning("Unable to inject trace context", exc_info=True)
309323

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ services:
132132
- ./.ddriot:/root/project/.riot
133133

134134
localstack:
135-
image: localstack/localstack:0.13.1
135+
image: localstack/localstack:0.14.1
136136
network_mode: bridge
137137
ports:
138138
- "127.0.0.1:4566:4566"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232 <https://github.com/DataDog/serverless-plugin-datadog/issues/232>`_

tests/contrib/botocore/test.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,9 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self):
10031003
assert msg_str == "test"
10041004
msg_attr = msg_body["MessageAttributes"]
10051005
assert msg_attr.get("_datadog") is not None
1006-
headers = json.loads(msg_attr["_datadog"]["Value"])
1006+
assert msg_attr["_datadog"]["Type"] == "Binary"
1007+
datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"])
1008+
headers = json.loads(datadog_value_decoded.decode())
10071009
assert headers is not None
10081010
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
10091011
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)
@@ -1068,7 +1070,9 @@ def test_sns_send_message_trace_injection_with_message_attributes(self):
10681070
assert msg_str == "test"
10691071
msg_attr = msg_body["MessageAttributes"]
10701072
assert msg_attr.get("_datadog") is not None
1071-
headers = json.loads(msg_attr["_datadog"]["Value"])
1073+
assert msg_attr["_datadog"]["Type"] == "Binary"
1074+
datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"])
1075+
headers = json.loads(datadog_value_decoded.decode())
10721076
assert headers is not None
10731077
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
10741078
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)

0 commit comments

Comments
 (0)