Skip to content

Extract span tags from triggering event #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2f051a6
add trigger event span tags
DylanLovesCoffee Dec 8, 2020
d6fbf3e
add tests and sample events
DylanLovesCoffee Dec 8, 2020
ce92c3f
Merge branch 'master' into dylan/trigger-tags
DylanLovesCoffee Dec 8, 2020
64fb4ec
update snapshots
DylanLovesCoffee Dec 8, 2020
080f007
lint
DylanLovesCoffee Dec 8, 2020
1d611c0
implement feedback
DylanLovesCoffee Dec 10, 2020
d83ee4e
refactor adding tags
DylanLovesCoffee Dec 11, 2020
b840c3d
black
DylanLovesCoffee Dec 11, 2020
d731561
include alb for http tags
DylanLovesCoffee Dec 14, 2020
15c4610
add s3 test
DylanLovesCoffee Dec 17, 2020
ec51ccd
nits and add CN and Gov arn regions
DylanLovesCoffee Dec 22, 2020
f275936
always add trigger tags to xray subseg
DylanLovesCoffee Dec 22, 2020
9cfaedc
implement feedback
DylanLovesCoffee Dec 24, 2020
1965908
handle Lambda response status code
DylanLovesCoffee Dec 28, 2020
e680aff
renaming for specificity
DylanLovesCoffee Dec 28, 2020
c0bc254
carry response status_code to xray
DylanLovesCoffee Jan 4, 2021
eaa952d
cleanup
DylanLovesCoffee Jan 4, 2021
195a6aa
fix merge conflicts
DylanLovesCoffee Jan 7, 2021
369bdfc
add xray dummy subsegment for trigger tags at end of invocation
DylanLovesCoffee Jan 8, 2021
79140cf
rename trigger tag and update snapshots
DylanLovesCoffee Jan 20, 2021
a02c289
lint EOF extra line
DylanLovesCoffee Jan 20, 2021
a9bbf0c
update with feedback
DylanLovesCoffee Jan 21, 2021
5375952
Merge branch 'main' into dylan/trigger-tags
DylanLovesCoffee Jan 21, 2021
0e71a7e
update snapshots
DylanLovesCoffee Jan 21, 2021
20b4945
fix
DylanLovesCoffee Jan 21, 2021
3190475
fix my fix
DylanLovesCoffee Jan 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datadog_lambda/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class TraceHeader(object):
# X-Ray subsegment to save Datadog trace metadata
class XraySubsegment(object):
NAME = "datadog-metadata"
KEY = "trace"
TRACE_KEY = "trace"
ROOT_SPAN_METADATA_KEY = "root_span_metadata"
NAMESPACE = "datadog"


Expand Down
65 changes: 46 additions & 19 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def _get_xray_trace_context():
"trace-id": _convert_xray_trace_id(xray_trace_entity.trace_id),
"parent-id": _convert_xray_entity_id(xray_trace_entity.id),
"sampling-priority": _convert_xray_sampling(xray_trace_entity.sampled),
"source": TraceContextSource.XRAY,
}


Expand Down Expand Up @@ -89,18 +88,45 @@ def _context_obj_to_headers(obj):
}


def create_dd_metadata_subsegment(dd_context):
"""
Save the context to an X-Ray subsegment's metadata field, so the X-Ray
trace can be converted to a Datadog trace in the Datadog backend with
the correct context.
"""
xray_recorder.begin_subsegment(XraySubsegment.NAME)
subsegment = xray_recorder.current_subsegment()

subsegment.put_metadata(
XraySubsegment.TRACE_KEY, dd_context, XraySubsegment.NAMESPACE
)
xray_recorder.end_subsegment()


def create_dd_root_span_metadata_subsegment(lambda_function_tags):
"""
Store tags to an X-Ray subsegment's metadata field to be added to the Lambda
function execution span
"""
xray_recorder.begin_subsegment(XraySubsegment.NAME)
subsegment = xray_recorder.current_subsegment()
subsegment.put_metadata(
XraySubsegment.ROOT_SPAN_METADATA_KEY,
lambda_function_tags,
XraySubsegment.NAMESPACE,
)
xray_recorder.end_subsegment()


def extract_dd_trace_context(event):
"""
Extract Datadog trace context from the Lambda `event` object.

Write the context to a global `dd_trace_context`, so the trace
can be continued on the outgoing requests with the context injected.

Save the context to an X-Ray subsegment's metadata field, so the X-Ray
trace can be converted to a Datadog trace in the Datadog backend with
the correct context.
"""
global dd_trace_context
trace_context_source = None
headers = event.get("headers", {})
lowercase_headers = {k.lower(): v for k, v in headers.items()}

Expand All @@ -114,19 +140,16 @@ def extract_dd_trace_context(event):
"parent-id": parent_id,
"sampling-priority": sampling_priority,
}
xray_recorder.begin_subsegment(XraySubsegment.NAME)
subsegment = xray_recorder.current_subsegment()

subsegment.put_metadata(XraySubsegment.KEY, metadata, XraySubsegment.NAMESPACE)
dd_trace_context = metadata.copy()
dd_trace_context["source"] = TraceContextSource.EVENT
xray_recorder.end_subsegment()
trace_context_source = TraceContextSource.EVENT
else:
# AWS Lambda runtime caches global variables between invocations,
# reset to avoid using the context from the last invocation.
dd_trace_context = _get_xray_trace_context()
if dd_trace_context:
trace_context_source = TraceContextSource.XRAY
logger.debug("extracted dd trace context %s", dd_trace_context)
return dd_trace_context
return dd_trace_context, trace_context_source


def get_dd_trace_context():
Expand Down Expand Up @@ -227,15 +250,20 @@ def is_lambda_context():
return type(xray_recorder.context) == LambdaContext


def set_dd_trace_py_root(trace_context, merge_xray_traces):
if trace_context["source"] == TraceContextSource.EVENT or merge_xray_traces:
def set_dd_trace_py_root(trace_context_source, merge_xray_traces):
if trace_context_source == TraceContextSource.EVENT or merge_xray_traces:
headers = get_dd_trace_context()
span_context = propagator.extract(headers)
tracer.context_provider.activate(span_context)


def create_function_execution_span(
context, function_name, is_cold_start, trace_context, merge_xray_traces
context,
function_name,
is_cold_start,
trace_context_source,
merge_xray_traces,
trigger_tags,
):
tags = {}
if context:
Expand All @@ -252,10 +280,9 @@ def create_function_execution_span(
"datadog_lambda": datadog_lambda_version,
"dd_trace": ddtrace_version,
}
source = trace_context["source"]
if source == TraceContextSource.XRAY and merge_xray_traces:
tags["_dd.parent_source"] = source

if trace_context_source == TraceContextSource.XRAY and merge_xray_traces:
tags["_dd.parent_source"] = trace_context_source
tags.update(trigger_tags)
args = {
"service": "aws.lambda",
"resource": function_name,
Expand Down
205 changes: 205 additions & 0 deletions datadog_lambda/trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.

import base64
import gzip
import json
from io import BytesIO, BufferedReader


EVENT_SOURCES = [
"aws:dynamodb",
"aws:kinesis",
"aws:s3",
"aws:sns",
"aws:sqs",
]

GOVCLOUD_REGIONS = ["us-gov-east-1", "us-gov-west-1"]
CHINA_REGIONS = ["cn-north-1", "cn-northwest-1"]


def get_arn_region_identifier(region):
if region in GOVCLOUD_REGIONS:
return "aws-us-gov"
if region in CHINA_REGIONS:
return "aws-cn"
return "aws"


def get_first_record(event):
records = event.get("Records")
if records and len(records) > 0:
return records[0]


def get_event_source(event):
"""Determines the source of the trigger event

Possible Returns:
api-gateway | application-load-balancer | cloudwatch-logs |
cloudwatch-events | cloudfront | dynamodb | kinesis | s3 | sns | sqs
"""
event_source = event.get("eventSource") or event.get("EventSource")

request_context = event.get("requestContext")
if request_context and request_context.get("stage"):
event_source = "api-gateway"

if request_context and request_context.get("elb"):
event_source = "application-load-balancer"

if event.get("awslogs"):
event_source = "cloudwatch-logs"

event_detail = event.get("detail")
cw_event_categories = event_detail and event_detail.get("EventCategories")
if event.get("source") == "aws.events" or cw_event_categories:
event_source = "cloudwatch-events"

event_record = get_first_record(event)
if event_record:
event_source = event_record.get("eventSource") or event_record.get(
"EventSource"
)
if event_record.get("cf"):
event_source = "cloudfront"

if event_source in EVENT_SOURCES:
event_source = event_source.replace("aws:", "")
return event_source


def parse_event_source_arn(source, event, context):
"""
Parses the trigger event for an available ARN. If an ARN field is not provided
in the event we stitch it together.
"""
split_function_arn = context.invoked_function_arn.split(":")
region = split_function_arn[3]
account_id = split_function_arn[4]
aws_arn = get_arn_region_identifier(region)

event_record = get_first_record(event)
# e.g. arn:aws:s3:::lambda-xyz123-abc890
if source == "s3":
return event_record.get("s3")["bucket"]["arn"]

# e.g. arn:aws:sns:us-east-1:123456789012:sns-lambda
if source == "sns":
return event_record.get("Sns")["TopicArn"]

# e.g. arn:aws:cloudfront::123456789012:distribution/ABC123XYZ
if source == "cloudfront":
distribution_id = event_record.get("cf")["config"]["distributionId"]
return "arn:{}:cloudfront::{}:distribution/{}".format(
aws_arn, account_id, distribution_id
)

# e.g. arn:aws:apigateway:us-east-1::/restapis/xyz123/stages/default
if source == "api-gateway":
request_context = event.get("requestContext")
return "arn:{}:apigateway:{}::/restapis/{}/stages/{}".format(
aws_arn, region, request_context["apiId"], request_context["stage"]
)

# e.g. arn:aws:elasticloadbalancing:us-east-1:123456789012:targetgroup/lambda-xyz/123
if source == "application-load-balancer":
request_context = event.get("requestContext")
return request_context.get("elb")["targetGroupArn"]

# e.g. arn:aws:logs:us-west-1:123456789012:log-group:/my-log-group-xyz
if source == "cloudwatch-logs":
with gzip.GzipFile(
fileobj=BytesIO(base64.b64decode(event["awslogs"]["data"]))
) as decompress_stream:
data = b"".join(BufferedReader(decompress_stream))
logs = json.loads(data)
log_group = logs.get("logGroup", "cloudwatch")
return "arn:{}:logs:{}:{}:log-group:{}".format(
aws_arn, region, account_id, log_group
)

# e.g. arn:aws:events:us-east-1:123456789012:rule/my-schedule
if source == "cloudwatch-events" and event.get("resources"):
return event.get("resources")[0]


def get_event_source_arn(source, event, context):
event_source_arn = event.get("eventSourceARN") or event.get("eventSourceArn")

event_record = get_first_record(event)
if event_record:
event_source_arn = event_record.get("eventSourceARN") or event_record.get(
"eventSourceArn"
)

if event_source_arn is None:
event_source_arn = parse_event_source_arn(source, event, context)

return event_source_arn


def get_http_tags(event):
"""
Extracts HTTP facet tags from the triggering event
"""
http_tags = {}
request_context = event.get("requestContext")
path = event.get("path")
method = event.get("httpMethod")
if request_context and request_context.get("stage"):
if request_context.get("domainName"):
http_tags["http.url"] = request_context["domainName"]

path = request_context.get("path")
method = request_context.get("httpMethod")
# Version 2.0 HTTP API Gateway
apigateway_v2_http = request_context.get("http")
if event.get("version") == "2.0" and apigateway_v2_http:
path = apigateway_v2_http.get("path")
method = apigateway_v2_http.get("method")

if path:
http_tags["http.url_details.path"] = path
if method:
http_tags["http.method"] = method

headers = event.get("headers")
if headers and headers.get("Referer"):
http_tags["http.referer"] = headers["Referer"]

return http_tags


def extract_trigger_tags(event, context):
"""
Parses the trigger event object to get tags to be added to the span metadata
"""
trigger_tags = {}
event_source = get_event_source(event)
if event_source:
trigger_tags["trigger.event_source"] = event_source

event_source_arn = get_event_source_arn(event_source, event, context)
if event_source_arn:
trigger_tags["trigger.event_source_arn"] = event_source_arn

if event_source in ["api-gateway", "application-load-balancer"]:
trigger_tags.update(get_http_tags(event))

return trigger_tags


def set_http_status_code_tag(span, response):
"""
If the Lambda was triggered by API Gateway or ALB add the returned status code
as a tag to the function execution span
"""
is_http_trigger = span and span.get_tag("trigger.event_source") == (
"api-gateway" or "application-load-balancer"
)
if is_http_trigger and response and response.get("statusCode"):
span.set_tag("http.status_code", response.get("statusCode"))
Loading