Skip to content

[SLS-1594] Inferred spans for SQS & SNS #190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 68 additions & 8 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
from ddtrace import __version__ as ddtrace_version
from ddtrace.propagation.http import HTTPPropagator
from datadog_lambda import __version__ as datadog_lambda_version
from datadog_lambda.trigger import parse_event_source, EventTypes, EventSubtypes
from datadog_lambda.trigger import (
parse_event_source,
get_first_record,
EventTypes,
EventSubtypes,
)
from datetime import datetime

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -382,7 +388,7 @@ def set_dd_trace_py_root(trace_context_source, merge_xray_traces):
)


def create_inferred_span(event, context, function_name):
def create_inferred_span(event, context):
event_source = parse_event_source(event)
try:
if event_source.equals(
Expand All @@ -400,6 +406,13 @@ def create_inferred_span(event, context, function_name):
):
logger.debug("API Gateway Websocket event detected. Inferring a span")
return create_inferred_span_from_api_gateway_websocket_event(event, context)
elif event_source.equals(EventTypes.SQS):
logger.debug("SQS event detected. Inferring a span")
return create_inferred_span_from_sqs_event(event, context)
elif event_source.equals(EventTypes.SNS):
logger.debug("SNS event detected. Inferring a span")
return create_inferred_span_from_sns_event(event, context)

except Exception as e:
logger.debug(
"Unable to infer span. Detected type: {}. Reason: {}",
Expand All @@ -419,7 +432,7 @@ def create_inferred_span_from_api_gateway_websocket_event(event, context):
"service.name": domain,
"http.url": domain + endpoint,
"endpoint": endpoint,
"resource_name": domain + endpoint,
"resource_names": domain + endpoint,
"request_id": context.aws_request_id,
"connection_id": event["requestContext"]["connectionId"],
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
Expand All @@ -446,7 +459,7 @@ def create_inferred_span_from_api_gateway_event(event, context):
"http.url": domain + path,
"endpoint": path,
"http.method": event["httpMethod"],
"resource_name": domain + path,
"resource_names": domain + path,
"request_id": context.aws_request_id,
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
}
Expand All @@ -472,7 +485,7 @@ def create_inferred_span_from_http_api_event(event, context):
"http.url": domain + path,
"endpoint": path,
"http.method": event["requestContext"]["http"]["method"],
"resource_name": domain + path,
"resource_names": domain + path,
"request_id": context.aws_request_id,
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
}
Expand All @@ -489,14 +502,61 @@ def create_inferred_span_from_http_api_event(event, context):
return span


def create_inferred_span_from_sqs_event(event, context):
event_record = get_first_record(event)
queue_name = event_record["eventSourceARN"].split(":")[-1]
tags = {
"operation_name": "aws.sqs",
"service.name": "sqs",
"resource_names": queue_name,
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
}
request_time_epoch = event_record["attributes"]["SentTimestamp"]
args = {
"resource": queue_name,
"span_type": "web",
}
tracer.set_tags({"_dd.origin": "lambda"})
span = tracer.trace("aws.sqs", **args)
if span:
span.set_tags(tags)
span.start = int(request_time_epoch) / 1000
return span


def create_inferred_span_from_sns_event(event, context):
event_record = get_first_record(event)
topic_name = event_record["Sns"]["TopicArn"].split(":")[-1]
tags = {
"operation_name": "aws.sns",
"service.name": "sns",
"resource_names": topic_name,
SPAN_TYPE_TAG: SPAN_TYPE_INFERRED,
}
sns_dt_format = "%Y-%m-%dT%H:%M:%S.%fZ"
timestamp = event_record["Sns"]["Timestamp"]
request_time_epoch = datetime.strptime(timestamp, sns_dt_format)

args = {
"resource": topic_name,
"span_type": "web",
}
tracer.set_tags({"_dd.origin": "lambda"})
span = tracer.trace("aws.sns", **args)
if span:
span.set_tags(tags)
span.start = int(request_time_epoch.strftime("%s"))
return span


def create_function_execution_span(
context,
function_name,
is_cold_start,
trace_context_source,
merge_xray_traces,
trigger_tags,
upstream=None,
parent_span=None,
):
tags = {}
if context:
Expand Down Expand Up @@ -529,6 +589,6 @@ def create_function_execution_span(
span = tracer.trace("aws.lambda", **args)
if span:
span.set_tags(tags)
if upstream:
span.parent_id = upstream.span_id
if parent_span:
span.parent_id = parent_span.span_id
return span
59 changes: 26 additions & 33 deletions datadog_lambda/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@
from enum import Enum
from typing import Any

EVENT_SOURCES_WITH_EXTRA_AWS = [
"aws:dynamodb",
"aws:kinesis",
"aws:s3",
"aws:sns",
"aws:sqs",
]


class _stringTypedEnum(Enum):
"""
Expand All @@ -41,15 +33,18 @@ class EventTypes(_stringTypedEnum):
CLOUDWATCH_LOGS = "cloudwatch-logs"
CLOUDWATCH_EVENTS = "cloudwatch-events"
CLOUDFRONT = "cloudfront"
DYNAMODB = "dynamodb"
KINESIS = "kinesis"
S3 = "s3"
SNS = "sns"
SQS = "sqs"


class EventSubtypes(_stringTypedEnum):
"""
EventSubtypes is an enum of Lambda event subtypes.
Currently, only API Gateway events have subtypes, but I imagine we might see more in the
future.
This was added to support the difference in handling of e.g. HTTP-API and Websocket events vs
vanilla API-Gateway events.
Currently, API Gateway events subtypes are supported,
e.g. HTTP-API and Websocket events vs vanilla API-Gateway events.
"""

NONE = "none"
Expand All @@ -61,32 +56,22 @@ class EventSubtypes(_stringTypedEnum):
class _EventSource:
"""
_EventSource holds an event's type and subtype.
If the event is of type UNKNOWN, an unknown_event_name may be provided.
Unknown_event_name will be discarded otherwise.
"""

def __init__(
self,
event_type: EventTypes,
subtype: EventSubtypes = EventSubtypes.NONE,
unknown_event_name: str = None,
):
if event_type == EventTypes.UNKNOWN:
if unknown_event_name in EVENT_SOURCES_WITH_EXTRA_AWS:
unknown_event_name = unknown_event_name.replace("aws:", "")
self.unknown_event_type = unknown_event_name
self.event_type = event_type
self.subtype = subtype

def to_string(self) -> str:
"""
to_string returns the string representation of an _EventSource.
If the event type is unknown, the unknown_event_type will be returned.
Since to_string was added to support trigger tagging, the event's subtype will never be
included in the string.
Since to_string was added to support trigger tagging,
the event's subtype will never be included in the string.
"""
if self.event_type == EventTypes.UNKNOWN:
return self.unknown_event_type
return self.event_type.get_string()

def equals(
Expand Down Expand Up @@ -125,10 +110,8 @@ def parse_event_source(event: dict) -> _EventSource:
if type(event) is not dict:
return _EventSource(EventTypes.UNKNOWN)

event_source = _EventSource(
EventTypes.UNKNOWN,
unknown_event_name=event.get("eventSource") or event.get("EventSource"),
)
event_source = _EventSource(EventTypes.UNKNOWN)

request_context = event.get("requestContext")
if request_context and request_context.get("stage"):
event_source = _EventSource(EventTypes.API_GATEWAY)
Expand All @@ -152,11 +135,21 @@ def parse_event_source(event: dict) -> _EventSource:

event_record = get_first_record(event)
if event_record:
event_source = _EventSource(
EventTypes.UNKNOWN,
unknown_event_name=event_record.get("eventSource")
or event_record.get("EventSource"),
aws_event_source = event_record.get(
"eventSource", event_record.get("EventSource")
)

if aws_event_source == "aws:dynamodb":
event_source = _EventSource(EventTypes.DYNAMODB)
if aws_event_source == "aws:kinesis":
event_source = _EventSource(EventTypes.KINESIS)
if aws_event_source == "aws:s3":
event_source = _EventSource(EventTypes.S3)
if aws_event_source == "aws:sns":
event_source = _EventSource(EventTypes.SNS)
if aws_event_source == "aws:sqs":
event_source = _EventSource(EventTypes.SQS)

if event_record.get("cf"):
event_source = _EventSource(EventTypes.CLOUDFRONT)

Expand Down Expand Up @@ -271,7 +264,7 @@ def extract_trigger_tags(event: dict, context: Any) -> dict:
"""
trigger_tags = {}
event_source = parse_event_source(event)
if event_source.to_string() is not None:
if event_source.to_string() is not None and event_source.to_string() != "unknown":
trigger_tags["function_trigger.event_source"] = event_source.to_string()

event_source_arn = get_event_source_arn(event_source, event, context)
Expand Down
7 changes: 2 additions & 5 deletions datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def __init__(self, func):
self.inferred_span = None
self.make_inferred_span = (
os.environ.get("DD_INFERRED_SPANS", "false").lower() == "true"
and should_use_extension
)
self.response = None

Expand Down Expand Up @@ -165,17 +164,15 @@ def _before(self, event, context):
if dd_tracing_enabled:
set_dd_trace_py_root(trace_context_source, self.merge_xray_traces)
if self.make_inferred_span:
self.inferred_span = create_inferred_span(
event, context, self.function_name
)
self.inferred_span = create_inferred_span(event, context)
self.span = create_function_execution_span(
context,
self.function_name,
is_cold_start(),
trace_context_source,
self.merge_xray_traces,
self.trigger_tags,
upstream=self.inferred_span,
parent_span=self.inferred_span,
)
else:
set_correlation_ids()
Expand Down
10 changes: 4 additions & 6 deletions scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ do
docker build -t datadog-lambda-python-test:$python_version \
-f tests/Dockerfile . \
--build-arg python_version=$python_version
docker run -v `pwd`:/datadog-lambda-python \
-w /datadog-lambda-python \
docker run -w /test \
datadog-lambda-python-test:$python_version \
nose2 -v
docker run -v `pwd`:/datadog-lambda-python \
-w /datadog-lambda-python \
poetry run nose2 -v
docker run -w /test \
datadog-lambda-python-test:$python_version \
flake8 datadog_lambda/
poetry run flake8 datadog_lambda/
done
7 changes: 5 additions & 2 deletions tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ RUN mkdir -p /test/datadog_lambda
WORKDIR /test

# Copy minimal subset of files to make pip install succeed and be cached (next docker builds will be way faster)
COPY setup.py .
# COPY setup.py .
COPY pyproject.toml .
COPY README.md .
COPY datadog_lambda/__init__.py datadog_lambda/__init__.py

RUN pip install .[dev]
RUN pip install poetry
RUN poetry install
RUN poetry install -E dev

# Install datadog-lambda with dev dependencies from local
COPY . .
Loading