Skip to content

Commit 361164c

Browse files
authored
[SLS-1594] Inferred spans for SQS & SNS (#190)
* Simplify event source parsing logic * Add sns & sqs inferred spans * Fix default evt source bug & update comments * Use poetry for running unit tests * Add additional api gateway trigger tests * Fix bugs & refactor * Add inferred spans unit tests * Update integration tests * Run all tests for all python versions * Add missing change * Update _dd.span_type to span_type * Remove dependence on extension * Additional api gateway test * Remove log line & add todo * Fix api gateway integration test * Move call to get_first_record
1 parent 6b146ea commit 361164c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+956
-140
lines changed

datadog_lambda/tracing.py

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,13 @@
2121
from ddtrace import __version__ as ddtrace_version
2222
from ddtrace.propagation.http import HTTPPropagator
2323
from datadog_lambda import __version__ as datadog_lambda_version
24-
from datadog_lambda.trigger import parse_event_source, EventTypes, EventSubtypes
24+
from datadog_lambda.trigger import (
25+
parse_event_source,
26+
get_first_record,
27+
EventTypes,
28+
EventSubtypes,
29+
)
30+
from datetime import datetime
2531

2632
logger = logging.getLogger(__name__)
2733

@@ -382,7 +388,7 @@ def set_dd_trace_py_root(trace_context_source, merge_xray_traces):
382388
)
383389

384390

385-
def create_inferred_span(event, context, function_name):
391+
def create_inferred_span(event, context):
386392
event_source = parse_event_source(event)
387393
try:
388394
if event_source.equals(
@@ -400,6 +406,13 @@ def create_inferred_span(event, context, function_name):
400406
):
401407
logger.debug("API Gateway Websocket event detected. Inferring a span")
402408
return create_inferred_span_from_api_gateway_websocket_event(event, context)
409+
elif event_source.equals(EventTypes.SQS):
410+
logger.debug("SQS event detected. Inferring a span")
411+
return create_inferred_span_from_sqs_event(event, context)
412+
elif event_source.equals(EventTypes.SNS):
413+
logger.debug("SNS event detected. Inferring a span")
414+
return create_inferred_span_from_sns_event(event, context)
415+
403416
except Exception as e:
404417
logger.debug(
405418
"Unable to infer span. Detected type: {}. Reason: {}",
@@ -419,7 +432,7 @@ def create_inferred_span_from_api_gateway_websocket_event(event, context):
419432
"service.name": domain,
420433
"http.url": domain + endpoint,
421434
"endpoint": endpoint,
422-
"resource_name": domain + endpoint,
435+
"resource_names": domain + endpoint,
423436
"request_id": context.aws_request_id,
424437
"connection_id": event["requestContext"]["connectionId"],
425438
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
@@ -446,7 +459,7 @@ def create_inferred_span_from_api_gateway_event(event, context):
446459
"http.url": domain + path,
447460
"endpoint": path,
448461
"http.method": event["httpMethod"],
449-
"resource_name": domain + path,
462+
"resource_names": domain + path,
450463
"request_id": context.aws_request_id,
451464
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
452465
}
@@ -472,7 +485,7 @@ def create_inferred_span_from_http_api_event(event, context):
472485
"http.url": domain + path,
473486
"endpoint": path,
474487
"http.method": event["requestContext"]["http"]["method"],
475-
"resource_name": domain + path,
488+
"resource_names": domain + path,
476489
"request_id": context.aws_request_id,
477490
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
478491
}
@@ -489,14 +502,61 @@ def create_inferred_span_from_http_api_event(event, context):
489502
return span
490503

491504

505+
def create_inferred_span_from_sqs_event(event, context):
506+
event_record = get_first_record(event)
507+
queue_name = event_record["eventSourceARN"].split(":")[-1]
508+
tags = {
509+
"operation_name": "aws.sqs",
510+
"service.name": "sqs",
511+
"resource_names": queue_name,
512+
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
513+
}
514+
request_time_epoch = event_record["attributes"]["SentTimestamp"]
515+
args = {
516+
"resource": queue_name,
517+
"span_type": "web",
518+
}
519+
tracer.set_tags({"_dd.origin": "lambda"})
520+
span = tracer.trace("aws.sqs", **args)
521+
if span:
522+
span.set_tags(tags)
523+
span.start = int(request_time_epoch) / 1000
524+
return span
525+
526+
527+
def create_inferred_span_from_sns_event(event, context):
528+
event_record = get_first_record(event)
529+
topic_name = event_record["Sns"]["TopicArn"].split(":")[-1]
530+
tags = {
531+
"operation_name": "aws.sns",
532+
"service.name": "sns",
533+
"resource_names": topic_name,
534+
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
535+
}
536+
sns_dt_format = "%Y-%m-%dT%H:%M:%S.%fZ"
537+
timestamp = event_record["Sns"]["Timestamp"]
538+
request_time_epoch = datetime.strptime(timestamp, sns_dt_format)
539+
540+
args = {
541+
"resource": topic_name,
542+
"span_type": "web",
543+
}
544+
tracer.set_tags({"_dd.origin": "lambda"})
545+
span = tracer.trace("aws.sns", **args)
546+
if span:
547+
span.set_tags(tags)
548+
span.start = int(request_time_epoch.strftime("%s"))
549+
return span
550+
551+
492552
def create_function_execution_span(
493553
context,
494554
function_name,
495555
is_cold_start,
496556
trace_context_source,
497557
merge_xray_traces,
498558
trigger_tags,
499-
upstream=None,
559+
parent_span=None,
500560
):
501561
tags = {}
502562
if context:
@@ -529,6 +589,6 @@ def create_function_execution_span(
529589
span = tracer.trace("aws.lambda", **args)
530590
if span:
531591
span.set_tags(tags)
532-
if upstream:
533-
span.parent_id = upstream.span_id
592+
if parent_span:
593+
span.parent_id = parent_span.span_id
534594
return span

datadog_lambda/trigger.py

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,6 @@
1010
from enum import Enum
1111
from typing import Any
1212

13-
EVENT_SOURCES_WITH_EXTRA_AWS = [
14-
"aws:dynamodb",
15-
"aws:kinesis",
16-
"aws:s3",
17-
"aws:sns",
18-
"aws:sqs",
19-
]
20-
2113

2214
class _stringTypedEnum(Enum):
2315
"""
@@ -41,15 +33,18 @@ class EventTypes(_stringTypedEnum):
4133
CLOUDWATCH_LOGS = "cloudwatch-logs"
4234
CLOUDWATCH_EVENTS = "cloudwatch-events"
4335
CLOUDFRONT = "cloudfront"
36+
DYNAMODB = "dynamodb"
37+
KINESIS = "kinesis"
38+
S3 = "s3"
39+
SNS = "sns"
40+
SQS = "sqs"
4441

4542

4643
class EventSubtypes(_stringTypedEnum):
4744
"""
4845
EventSubtypes is an enum of Lambda event subtypes.
49-
Currently, only API Gateway events have subtypes, but I imagine we might see more in the
50-
future.
51-
This was added to support the difference in handling of e.g. HTTP-API and Websocket events vs
52-
vanilla API-Gateway events.
46+
Currently, API Gateway events subtypes are supported,
47+
e.g. HTTP-API and Websocket events vs vanilla API-Gateway events.
5348
"""
5449

5550
NONE = "none"
@@ -61,32 +56,22 @@ class EventSubtypes(_stringTypedEnum):
6156
class _EventSource:
6257
"""
6358
_EventSource holds an event's type and subtype.
64-
If the event is of type UNKNOWN, an unknown_event_name may be provided.
65-
Unknown_event_name will be discarded otherwise.
6659
"""
6760

6861
def __init__(
6962
self,
7063
event_type: EventTypes,
7164
subtype: EventSubtypes = EventSubtypes.NONE,
72-
unknown_event_name: str = None,
7365
):
74-
if event_type == EventTypes.UNKNOWN:
75-
if unknown_event_name in EVENT_SOURCES_WITH_EXTRA_AWS:
76-
unknown_event_name = unknown_event_name.replace("aws:", "")
77-
self.unknown_event_type = unknown_event_name
7866
self.event_type = event_type
7967
self.subtype = subtype
8068

8169
def to_string(self) -> str:
8270
"""
8371
to_string returns the string representation of an _EventSource.
84-
If the event type is unknown, the unknown_event_type will be returned.
85-
Since to_string was added to support trigger tagging, the event's subtype will never be
86-
included in the string.
72+
Since to_string was added to support trigger tagging,
73+
the event's subtype will never be included in the string.
8774
"""
88-
if self.event_type == EventTypes.UNKNOWN:
89-
return self.unknown_event_type
9075
return self.event_type.get_string()
9176

9277
def equals(
@@ -125,10 +110,8 @@ def parse_event_source(event: dict) -> _EventSource:
125110
if type(event) is not dict:
126111
return _EventSource(EventTypes.UNKNOWN)
127112

128-
event_source = _EventSource(
129-
EventTypes.UNKNOWN,
130-
unknown_event_name=event.get("eventSource") or event.get("EventSource"),
131-
)
113+
event_source = _EventSource(EventTypes.UNKNOWN)
114+
132115
request_context = event.get("requestContext")
133116
if request_context and request_context.get("stage"):
134117
event_source = _EventSource(EventTypes.API_GATEWAY)
@@ -152,11 +135,21 @@ def parse_event_source(event: dict) -> _EventSource:
152135

153136
event_record = get_first_record(event)
154137
if event_record:
155-
event_source = _EventSource(
156-
EventTypes.UNKNOWN,
157-
unknown_event_name=event_record.get("eventSource")
158-
or event_record.get("EventSource"),
138+
aws_event_source = event_record.get(
139+
"eventSource", event_record.get("EventSource")
159140
)
141+
142+
if aws_event_source == "aws:dynamodb":
143+
event_source = _EventSource(EventTypes.DYNAMODB)
144+
if aws_event_source == "aws:kinesis":
145+
event_source = _EventSource(EventTypes.KINESIS)
146+
if aws_event_source == "aws:s3":
147+
event_source = _EventSource(EventTypes.S3)
148+
if aws_event_source == "aws:sns":
149+
event_source = _EventSource(EventTypes.SNS)
150+
if aws_event_source == "aws:sqs":
151+
event_source = _EventSource(EventTypes.SQS)
152+
160153
if event_record.get("cf"):
161154
event_source = _EventSource(EventTypes.CLOUDFRONT)
162155

@@ -271,7 +264,7 @@ def extract_trigger_tags(event: dict, context: Any) -> dict:
271264
"""
272265
trigger_tags = {}
273266
event_source = parse_event_source(event)
274-
if event_source.to_string() is not None:
267+
if event_source.to_string() is not None and event_source.to_string() != "unknown":
275268
trigger_tags["function_trigger.event_source"] = event_source.to_string()
276269

277270
event_source_arn = get_event_source_arn(event_source, event, context)

datadog_lambda/wrapper.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ def __init__(self, func):
104104
self.inferred_span = None
105105
self.make_inferred_span = (
106106
os.environ.get("DD_INFERRED_SPANS", "false").lower() == "true"
107-
and should_use_extension
108107
)
109108
self.response = None
110109

@@ -165,17 +164,15 @@ def _before(self, event, context):
165164
if dd_tracing_enabled:
166165
set_dd_trace_py_root(trace_context_source, self.merge_xray_traces)
167166
if self.make_inferred_span:
168-
self.inferred_span = create_inferred_span(
169-
event, context, self.function_name
170-
)
167+
self.inferred_span = create_inferred_span(event, context)
171168
self.span = create_function_execution_span(
172169
context,
173170
self.function_name,
174171
is_cold_start(),
175172
trace_context_source,
176173
self.merge_xray_traces,
177174
self.trigger_tags,
178-
upstream=self.inferred_span,
175+
parent_span=self.inferred_span,
179176
)
180177
else:
181178
set_correlation_ids()

scripts/run_tests.sh

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@ do
1616
docker build -t datadog-lambda-python-test:$python_version \
1717
-f tests/Dockerfile . \
1818
--build-arg python_version=$python_version
19-
docker run -v `pwd`:/datadog-lambda-python \
20-
-w /datadog-lambda-python \
19+
docker run -w /test \
2120
datadog-lambda-python-test:$python_version \
22-
nose2 -v
23-
docker run -v `pwd`:/datadog-lambda-python \
24-
-w /datadog-lambda-python \
21+
poetry run nose2 -v
22+
docker run -w /test \
2523
datadog-lambda-python-test:$python_version \
26-
flake8 datadog_lambda/
24+
poetry run flake8 datadog_lambda/
2725
done

tests/Dockerfile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ RUN mkdir -p /test/datadog_lambda
77
WORKDIR /test
88

99
# Copy minimal subset of files to make pip install succeed and be cached (next docker builds will be way faster)
10-
COPY setup.py .
10+
# COPY setup.py .
11+
COPY pyproject.toml .
1112
COPY README.md .
1213
COPY datadog_lambda/__init__.py datadog_lambda/__init__.py
1314

14-
RUN pip install .[dev]
15+
RUN pip install poetry
16+
RUN poetry install
17+
RUN poetry install -E dev
1518

1619
# Install datadog-lambda with dev dependencies from local
1720
COPY . .

0 commit comments

Comments
 (0)