@@ -61,28 +61,41 @@ class TraceInjectionDecodingError(Exception):
61
61
pass
62
62
63
63
64
- def inject_trace_data_to_message_attributes (trace_data , entry ):
65
- # type: (Dict[str, str], Dict[str, Any]) -> None
64
+ def inject_trace_data_to_message_attributes (trace_data , entry , endpoint = None ):
65
+ # type: (Dict[str, str], Dict[str, Any], Optional[str] ) -> None
66
66
"""
67
67
:trace_data: trace headers to be stored in the entry's MessageAttributes
68
68
:entry: an SQS or SNS record
69
+ :endpoint: endpoint of message, "sqs" or "sns"
69
70
70
71
Inject trace headers into the an SQS or SNS record's MessageAttributes
71
72
"""
72
73
if "MessageAttributes" not in entry :
73
74
entry ["MessageAttributes" ] = {}
74
- # An Amazon SQS message can contain up to 10 metadata attributes.
75
+ # Max of 10 message attributes.
75
76
if len (entry ["MessageAttributes" ]) < 10 :
76
- entry ["MessageAttributes" ]["_datadog" ] = {"DataType" : "String" , "StringValue" : json .dumps (trace_data )}
77
+ if endpoint == "sqs" :
78
+ # Use String since changing this to Binary would be a breaking
79
+ # change as other tracers expect this to be a String.
80
+ entry ["MessageAttributes" ]["_datadog" ] = {"DataType" : "String" , "StringValue" : json .dumps (trace_data )}
81
+ elif endpoint == "sns" :
82
+ # Use Binary since SNS subscription filter policies fail silently
83
+ # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269
84
+ # AWS will encode our value if it sees "Binary"
85
+ entry ["MessageAttributes" ]["_datadog" ] = {"DataType" : "Binary" , "BinaryValue" : json .dumps (trace_data )}
86
+ else :
87
+ log .warning ("skipping trace injection, endpoint is not SNS or SQS" )
77
88
else :
89
+ # In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute
78
90
log .warning ("skipping trace injection, max number (10) of MessageAttributes exceeded" )
79
91
80
92
81
- def inject_trace_to_sqs_or_sns_batch_message (params , span ):
82
- # type: (Any, Span) -> None
93
+ def inject_trace_to_sqs_or_sns_batch_message (params , span , endpoint = None ):
94
+ # type: (Any, Span, Optional[str] ) -> None
83
95
"""
84
96
:params: contains the params for the current botocore action
85
97
:span: the span which provides the trace context to be propagated
98
+ :endpoint: endpoint of message, "sqs" or "sns"
86
99
87
100
Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch
88
101
"""
@@ -94,21 +107,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span):
94
107
# or PublishBatchRequestEntries (in case of PublishBatch).
95
108
entries = params .get ("Entries" , params .get ("PublishBatchRequestEntries" , []))
96
109
for entry in entries :
97
- inject_trace_data_to_message_attributes (trace_data , entry )
110
+ inject_trace_data_to_message_attributes (trace_data , entry , endpoint )
98
111
99
112
100
- def inject_trace_to_sqs_or_sns_message (params , span ):
101
- # type: (Any, Span) -> None
113
+ def inject_trace_to_sqs_or_sns_message (params , span , endpoint = None ):
114
+ # type: (Any, Span, Optional[str] ) -> None
102
115
"""
103
116
:params: contains the params for the current botocore action
104
117
:span: the span which provides the trace context to be propagated
118
+ :endpoint: endpoint of message, "sqs" or "sns"
105
119
106
120
Inject trace headers into MessageAttributes for the SQS or SNS record
107
121
"""
108
122
trace_data = {}
109
123
HTTPPropagator .inject (span .context , trace_data )
110
124
111
- inject_trace_data_to_message_attributes (trace_data , params )
125
+ inject_trace_data_to_message_attributes (trace_data , params , endpoint )
112
126
113
127
114
128
def inject_trace_to_eventbridge_detail (params , span ):
@@ -293,17 +307,17 @@ def patched_api_call(original_func, instance, args, kwargs):
293
307
if endpoint_name == "lambda" and operation == "Invoke" :
294
308
inject_trace_to_client_context (params , span )
295
309
if endpoint_name == "sqs" and operation == "SendMessage" :
296
- inject_trace_to_sqs_or_sns_message (params , span )
310
+ inject_trace_to_sqs_or_sns_message (params , span , endpoint_name )
297
311
if endpoint_name == "sqs" and operation == "SendMessageBatch" :
298
- inject_trace_to_sqs_or_sns_batch_message (params , span )
312
+ inject_trace_to_sqs_or_sns_batch_message (params , span , endpoint_name )
299
313
if endpoint_name == "events" and operation == "PutEvents" :
300
314
inject_trace_to_eventbridge_detail (params , span )
301
315
if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords" ):
302
316
inject_trace_to_kinesis_stream (params , span )
303
317
if endpoint_name == "sns" and operation == "Publish" :
304
- inject_trace_to_sqs_or_sns_message (params , span )
318
+ inject_trace_to_sqs_or_sns_message (params , span , endpoint_name )
305
319
if endpoint_name == "sns" and operation == "PublishBatch" :
306
- inject_trace_to_sqs_or_sns_batch_message (params , span )
320
+ inject_trace_to_sqs_or_sns_batch_message (params , span , endpoint_name )
307
321
except Exception :
308
322
log .warning ("Unable to inject trace context" , exc_info = True )
309
323
0 commit comments