Skip to content

Commit 7b2adcf

Browse files
committed
Handle SNS message inside SQS event
1 parent 189ab05 commit 7b2adcf

File tree

1 file changed

+52
-1
lines changed

1 file changed

+52
-1
lines changed

datadog_lambda/tracing.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,18 @@ def extract_context_from_http_event_or_context(event, lambda_context):
178178
return trace_id, parent_id, sampling_priority
179179

180180

181+
def create_sns_event(message):
182+
return {
183+
"Records": [
184+
{
185+
"EventSource": "aws:sns",
186+
"EventVersion": "1.0",
187+
"Sns": message,
188+
}
189+
]
190+
}
191+
192+
181193
def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
182194
"""
183195
Extract Datadog trace context from the first SQS message attributes.
@@ -186,6 +198,19 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
186198
"""
187199
try:
188200
first_record = event["Records"][0]
201+
202+
# logic to deal with SNS => SQS event
203+
if "body" in first_record:
204+
body_str = first_record.get("body", {})
205+
try:
206+
body = json.loads(body_str)
207+
if body.get("Type", "") == "Notification" and "TopicArn" in body:
208+
logger.debug("Found SNS message inside SQS event")
209+
first_record = get_first_record(create_sns_event(body))
210+
except Exception:
211+
first_record = event["Records"][0]
212+
pass
213+
189214
msg_attributes = first_record.get(
190215
"messageAttributes",
191216
first_record.get("Sns", {}).get("MessageAttributes", {}),
@@ -533,6 +558,8 @@ def create_inferred_span_from_http_api_event(event, context):
533558

534559

535560
def create_inferred_span_from_sqs_event(event, context):
561+
trace_ctx = tracer.current_trace_context()
562+
536563
event_record = get_first_record(event)
537564
queue_name = event_record["eventSourceARN"].split(":")[-1]
538565
tags = {
@@ -546,11 +573,35 @@ def create_inferred_span_from_sqs_event(event, context):
546573
"resource": queue_name,
547574
"span_type": "web",
548575
}
576+
start_time = int(request_time_epoch) / 1000
577+
578+
# logic to deal with SNS => SQS event
579+
sns_span = None
580+
if "body" in event_record:
581+
body_str = event_record.get("body", {})
582+
try:
583+
body = json.loads(body_str)
584+
if body.get("Type", "") == "Notification" and "TopicArn" in body:
585+
logger.debug("Found SNS message inside SQS event")
586+
sns_span = create_inferred_span_from_sns_event(
587+
create_sns_event(body), context
588+
)
589+
sns_span.finish(finish_time=start_time)
590+
except Exception:
591+
logger.debug("Unable to create SNS span from SQS message")
592+
pass
593+
594+
# trace context needs to be set again as it is reset
595+
# when sns_span.finish executes
596+
tracer.context_provider.activate(trace_ctx)
549597
tracer.set_tags({"_dd.origin": "lambda"})
550598
span = tracer.trace("aws.sqs", **args)
551599
if span:
552600
span.set_tags(tags)
553-
span.start = int(request_time_epoch) / 1000
601+
span.start = start_time
602+
if sns_span:
603+
span.parent_id = sns_span.span_id
604+
554605
return span
555606

556607

0 commit comments

Comments
 (0)