Skip to content

Commit bfb0f7a

Browse files
Inject trace context into AWS Step Functions input (#7585)
1 parent fc55ca1 commit bfb0f7a

File tree

6 files changed

+376
-0
lines changed

6 files changed

+376
-0
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
muzzle {
2+
pass {
3+
group = "software.amazon.awssdk"
4+
module = "sfn"
5+
// 2.15.35 is the minimum version with step functions
6+
versions = "[2.15.35,)"
7+
assertInverse = true
8+
}
9+
}
10+
11+
apply from: "$rootDir/gradle/java.gradle"
12+
13+
addTestSuiteForDir('latestDepTest', 'test')
14+
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test')
15+
16+
dependencies {
17+
compileOnly group: 'software.amazon.awssdk', name: 'sfn', version: '2.15.35'
18+
19+
// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
20+
testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4')
21+
testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2')
22+
testImplementation 'software.amazon.awssdk:sfn:2.15.35'
23+
testImplementation libs.testcontainers
24+
25+
latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sfn', version: '+'
26+
}
27+
28+
tasks.withType(Test).configureEach {
29+
usesService(testcontainersLimit)
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package datadog.trace.instrumentation.aws.v2.sfn;
2+
3+
import datadog.json.JsonMapper;
4+
import datadog.json.JsonWriter;
5+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
6+
7+
public class InputAttributeInjector {
8+
private static final String DATADOG_KEY = "_datadog";
9+
10+
public static String buildTraceContext(AgentSpan span) {
11+
String tagsJson = JsonMapper.toJson(span.getTags());
12+
try (JsonWriter writer = new JsonWriter()) {
13+
writer.beginObject();
14+
writer.name("x-datadog-trace-id").value(span.getTraceId().toString());
15+
writer.name("x-datadog-parent-id").value(String.valueOf(span.getSpanId()));
16+
writer.name("x-datadog-tags").jsonValue(tagsJson);
17+
writer.endObject();
18+
return writer.toString();
19+
} catch (Exception e) {
20+
return null;
21+
}
22+
}
23+
24+
public static String getModifiedInput(String request, String ddTraceContextJSON) {
25+
if (request == null || ddTraceContextJSON == null) {
26+
return request; // leave request unmodified
27+
}
28+
29+
final String traceContextProperty = "\"" + DATADOG_KEY + "\":" + ddTraceContextJSON;
30+
int startPos = request.indexOf('{');
31+
int endPos = request.lastIndexOf('}');
32+
33+
if (startPos < 0 || endPos < startPos) {
34+
return request; // leave request unmodified
35+
}
36+
37+
// If input is an empty {}
38+
if (endPos == startPos + 1) {
39+
return "{" + traceContextProperty + "}";
40+
}
41+
42+
String existingJSON = request.substring(startPos + 1, endPos).trim();
43+
if (existingJSON.isEmpty()) {
44+
return "{" + traceContextProperty + "}";
45+
} else {
46+
return "{" + existingJSON + "," + traceContextProperty + "}";
47+
}
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package datadog.trace.instrumentation.aws.v2.sfn;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
6+
import com.google.auto.service.AutoService;
7+
import datadog.trace.agent.tooling.Instrumenter;
8+
import datadog.trace.agent.tooling.InstrumenterModule;
9+
import java.util.List;
10+
import net.bytebuddy.asm.Advice;
11+
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
12+
13+
/** AWS SDK v2 Step Function instrumentation */
14+
@AutoService(InstrumenterModule.class)
15+
public final class SfnClientInstrumentation extends InstrumenterModule.Tracing
16+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
17+
18+
public SfnClientInstrumentation() {
19+
super("sfn", "aws-sdk");
20+
}
21+
22+
@Override
23+
public String instrumentedType() {
24+
return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
25+
}
26+
27+
@Override
28+
public void methodAdvice(MethodTransformer transformer) {
29+
transformer.applyAdvice(
30+
isMethod().and(named("resolveExecutionInterceptors")),
31+
SfnClientInstrumentation.class.getName() + "$AwsSfnBuilderAdvice");
32+
}
33+
34+
@Override
35+
public String[] helperClassNames() {
36+
return new String[] {packageName + ".SfnInterceptor", packageName + ".InputAttributeInjector"};
37+
}
38+
39+
public static class AwsSfnBuilderAdvice {
40+
@Advice.OnMethodExit(suppress = Throwable.class)
41+
public static void addHandler(@Advice.Return final List<ExecutionInterceptor> interceptors) {
42+
for (ExecutionInterceptor interceptor : interceptors) {
43+
if (interceptor instanceof SfnInterceptor) {
44+
return;
45+
}
46+
}
47+
interceptors.add(new SfnInterceptor());
48+
}
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package datadog.trace.instrumentation.aws.v2.sfn;
2+
3+
import datadog.trace.bootstrap.InstanceStore;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
5+
import software.amazon.awssdk.core.SdkRequest;
6+
import software.amazon.awssdk.core.interceptor.Context;
7+
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
8+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
9+
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
10+
import software.amazon.awssdk.services.sfn.model.StartExecutionRequest;
11+
import software.amazon.awssdk.services.sfn.model.StartSyncExecutionRequest;
12+
13+
public class SfnInterceptor implements ExecutionInterceptor {
14+
15+
public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE =
16+
InstanceStore.of(ExecutionAttribute.class)
17+
.putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan"));
18+
19+
public SfnInterceptor() {}
20+
21+
@Override
22+
public SdkRequest modifyRequest(
23+
Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
24+
try {
25+
return modifyRequestImpl(context, executionAttributes);
26+
} catch (Exception e) {
27+
return context.request();
28+
}
29+
}
30+
31+
public SdkRequest modifyRequestImpl(
32+
Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
33+
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
34+
// StartExecutionRequest
35+
if (context.request() instanceof StartExecutionRequest) {
36+
StartExecutionRequest request = (StartExecutionRequest) context.request();
37+
if (request.input() == null) {
38+
return request;
39+
}
40+
return injectTraceContext(span, request);
41+
}
42+
43+
// StartSyncExecutionRequest
44+
if (context.request() instanceof StartSyncExecutionRequest) {
45+
StartSyncExecutionRequest request = (StartSyncExecutionRequest) context.request();
46+
if (request.input() == null) {
47+
return request;
48+
}
49+
return injectTraceContext(span, request);
50+
}
51+
52+
return context.request();
53+
}
54+
55+
private SdkRequest injectTraceContext(AgentSpan span, StartExecutionRequest request) {
56+
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span);
57+
// Inject the trace context into the StartExecutionRequest input
58+
String modifiedInput =
59+
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON);
60+
61+
return request.toBuilder().input(modifiedInput).build();
62+
}
63+
64+
private SdkRequest injectTraceContext(AgentSpan span, StartSyncExecutionRequest request) {
65+
String ddTraceContextJSON = InputAttributeInjector.buildTraceContext(span);
66+
// Inject the trace context into the StartSyncExecutionRequest input
67+
String modifiedInput =
68+
InputAttributeInjector.getModifiedInput(request.input(), ddTraceContextJSON);
69+
70+
return request.toBuilder().input(modifiedInput).build();
71+
}
72+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import datadog.trace.agent.test.naming.VersionedNamingTestBase
2+
import datadog.trace.agent.test.utils.TraceUtils
3+
import datadog.trace.api.DDSpanTypes
4+
import datadog.trace.bootstrap.instrumentation.api.Tags
5+
import groovy.json.JsonSlurper
6+
import org.testcontainers.containers.GenericContainer
7+
import org.testcontainers.utility.DockerImageName
8+
import software.amazon.awssdk.services.sfn.SfnClient
9+
import software.amazon.awssdk.services.sfn.model.SfnException
10+
import software.amazon.awssdk.services.sfn.model.StartExecutionResponse
11+
import software.amazon.awssdk.regions.Region
12+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
13+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
14+
import spock.lang.Shared
15+
16+
import java.time.Duration
17+
18+
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
19+
20+
21+
abstract class SfnClientTest extends VersionedNamingTestBase {
22+
@Shared GenericContainer localStack
23+
@Shared SfnClient sfnClient
24+
@Shared String testStateMachineARN
25+
@Shared Object endPoint
26+
27+
def setupSpec() {
28+
localStack = new GenericContainer(DockerImageName.parse("localstack/localstack"))
29+
.withExposedPorts(4566)
30+
.withEnv("SERVICES", "stepfunctions")
31+
.withReuse(true)
32+
.withStartupTimeout(Duration.ofSeconds(120))
33+
localStack.start()
34+
endPoint = "http://" + localStack.getHost() + ":" + localStack.getMappedPort(4566)
35+
sfnClient = SfnClient.builder()
36+
.endpointOverride(URI.create(endPoint))
37+
.region(Region.US_EAST_1)
38+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
39+
.build()
40+
41+
def response = sfnClient.createStateMachine { builder ->
42+
builder.name("testStateMachine")
43+
.definition("{\"StartAt\": \"HelloWorld\", \"States\": {\"HelloWorld\": {\"Type\": \"Pass\", \"End\": true}}}")
44+
.build()
45+
}
46+
testStateMachineARN = response.stateMachineArn()
47+
}
48+
49+
def cleanupSpec() {
50+
sfnClient.close()
51+
localStack.stop()
52+
}
53+
54+
def "Step Functions span is created"() {
55+
when:
56+
StartExecutionResponse response
57+
TraceUtils.runUnderTrace('parent', {
58+
response = sfnClient.startExecution { builder ->
59+
builder.stateMachineArn(testStateMachineARN)
60+
.input("{\"key\": \"value\"}")
61+
.build()
62+
}
63+
})
64+
65+
then:
66+
assertTraces(1) {
67+
trace(2) {
68+
basicSpan(it, "parent")
69+
span {
70+
serviceName service()
71+
operationName operation()
72+
resourceName "Sfn.StartExecution"
73+
spanType DDSpanTypes.HTTP_CLIENT
74+
errored false
75+
measured true
76+
childOf(span(0))
77+
tags {
78+
"$Tags.COMPONENT" "java-aws-sdk"
79+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
80+
"$Tags.HTTP_URL" endPoint+'/'
81+
"$Tags.HTTP_METHOD" "POST"
82+
"$Tags.HTTP_STATUS" 200
83+
"$Tags.PEER_PORT" localStack.getMappedPort(4566)
84+
"$Tags.PEER_HOSTNAME" localStack.getHost()
85+
"aws.service" "Sfn"
86+
"aws.operation" "StartExecution"
87+
"aws.agent" "java-aws-sdk"
88+
"aws.requestId" response.responseMetadata().requestId()
89+
"aws_service" "Sfn"
90+
defaultTags()
91+
}
92+
}
93+
}
94+
}
95+
}
96+
97+
def "Trace context is injected to Step Functions input"() {
98+
when:
99+
StartExecutionResponse response
100+
TraceUtils.runUnderTrace('parent', {
101+
response = sfnClient.startExecution { builder ->
102+
builder.stateMachineArn(testStateMachineARN)
103+
.input("{\"key\": \"value\"}")
104+
.build()
105+
}
106+
})
107+
108+
then:
109+
def execution = sfnClient.describeExecution { builder ->
110+
builder.executionArn(response.executionArn())
111+
.build()
112+
}
113+
def input = new JsonSlurper().parseText(execution.input())
114+
input["key"] == "value"
115+
input["_datadog"]["x-datadog-trace-id"] != null
116+
input["_datadog"]["x-datadog-parent-id"] != null
117+
input["_datadog"]["x-datadog-tags"] != null
118+
}
119+
120+
def "AWS rejects invalid JSON but instrumentation does not error"() {
121+
when:
122+
sfnClient.startExecution { b ->
123+
b.stateMachineArn(testStateMachineARN)
124+
.input("hello") // invalid JSON
125+
.build()
126+
}
127+
128+
then:
129+
thrown(SfnException)
130+
}
131+
132+
def "Doesn't cause error for Step Functions input edge cases"() {
133+
def inputs = [
134+
'''{}''',
135+
'''{ }''',
136+
''' { } ''',
137+
'''{"foo": "bar"}''',
138+
''' { "foo" : "bar" } ''',
139+
'''{"key1": "val1", "key2": "val2"}''',
140+
''' { "key1" : "val1" , "key2" : "val2" } '''
141+
]
142+
143+
when:
144+
inputs.forEach { input ->
145+
TraceUtils.runUnderTrace('parent', {
146+
sfnClient.startExecution { builder ->
147+
builder.stateMachineArn(testStateMachineARN)
148+
.input(input)
149+
.build()
150+
}
151+
})
152+
}
153+
154+
then:
155+
noExceptionThrown()
156+
}
157+
}
158+
159+
class SfnClientV0Test extends SfnClientTest {
160+
@Override
161+
int version() {
162+
0
163+
}
164+
165+
@Override
166+
String service() {
167+
return "java-aws-sdk"
168+
}
169+
170+
@Override
171+
String operation() {
172+
return "aws.http"
173+
}
174+
}

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ include ':dd-java-agent:instrumentation:aws-common'
196196
include ':dd-java-agent:instrumentation:aws-java-eventbridge-2.0'
197197
include ':dd-java-agent:instrumentation:aws-java-sdk-1.11.0'
198198
include ':dd-java-agent:instrumentation:aws-java-sdk-2.2'
199+
include ':dd-java-agent:instrumentation:aws-java-sfn-2.0'
199200
include ':dd-java-agent:instrumentation:aws-java-sns-1.0'
200201
include ':dd-java-agent:instrumentation:aws-java-sns-2.0'
201202
include ':dd-java-agent:instrumentation:aws-java-sqs-1.0'

0 commit comments

Comments
 (0)