Skip to content

Commit edf94c6

Browse files
authored
End spring kafka span in afterRecord callback (#15367)
1 parent 180160a commit edf94c6

File tree

7 files changed

+250
-104
lines changed

7 files changed

+250
-104
lines changed

instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ testing {
5757
targets {
5858
all {
5959
testTask.configure {
60-
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
61-
6260
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=false")
6361
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
6462

@@ -71,10 +69,12 @@ testing {
7169
}
7270

7371
tasks {
74-
test {
72+
withType<Test>().configureEach {
7573
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
74+
systemProperty("testLatestDeps", latestDepTest)
75+
}
7676

77-
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
77+
test {
7878
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
7979
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
8080

instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/v2_7/SpringKafkaTest.java

Lines changed: 114 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -177,64 +177,126 @@ void shouldHandleFailureInKafkaListener() {
177177
satisfies(longKey("kafka.record.queue_time_ms"), AbstractLongAssert::isNotNegative));
178178

179179
AtomicReference<SpanData> producer = new AtomicReference<>();
180-
testing.waitAndAssertSortedTraces(
181-
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
182-
trace -> {
183-
trace.hasSpansSatisfyingExactly(
184-
span -> span.hasName("producer"),
185-
span ->
186-
span.hasName("testSingleTopic publish")
187-
.hasKind(SpanKind.PRODUCER)
188-
.hasParent(trace.getSpan(0))
189-
.hasAttributesSatisfyingExactly(
190-
equalTo(MESSAGING_SYSTEM, "kafka"),
191-
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
192-
equalTo(MESSAGING_OPERATION, "publish"),
193-
satisfies(
194-
MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
195-
satisfies(
196-
MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
197-
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"),
198-
satisfies(
199-
MESSAGING_CLIENT_ID,
200-
stringAssert -> stringAssert.startsWith("producer"))));
201-
202-
producer.set(trace.getSpan(1));
203-
},
204-
trace ->
205-
trace.hasSpansSatisfyingExactly(
206-
receiveSpanAssert,
207-
span ->
208-
span.hasName("testSingleTopic process")
209-
.hasKind(SpanKind.CONSUMER)
210-
.hasParent(trace.getSpan(0))
211-
.hasLinks(LinkData.create(producer.get().getSpanContext()))
212-
.hasStatus(StatusData.error())
213-
.hasException(new IllegalArgumentException("boom"))
214-
.hasAttributesSatisfyingExactly(processAttributes),
215-
span -> span.hasName("consumer").hasParent(trace.getSpan(1))),
216-
trace ->
180+
// trace structure differs in latest dep tests because CommonErrorHandler is only set for latest
181+
// dep tests
182+
if (Boolean.getBoolean("testLatestDeps")) {
183+
testing.waitAndAssertSortedTraces(
184+
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
185+
trace -> {
217186
trace.hasSpansSatisfyingExactly(
218-
receiveSpanAssert,
187+
span -> span.hasName("producer"),
219188
span ->
220-
span.hasName("testSingleTopic process")
221-
.hasKind(SpanKind.CONSUMER)
189+
span.hasName("testSingleTopic publish")
190+
.hasKind(SpanKind.PRODUCER)
222191
.hasParent(trace.getSpan(0))
223-
.hasLinks(LinkData.create(producer.get().getSpanContext()))
224-
.hasStatus(StatusData.error())
225-
.hasException(new IllegalArgumentException("boom"))
226-
.hasAttributesSatisfyingExactly(processAttributes),
227-
span -> span.hasName("consumer").hasParent(trace.getSpan(1))),
228-
trace ->
192+
.hasAttributesSatisfyingExactly(
193+
equalTo(MESSAGING_SYSTEM, "kafka"),
194+
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
195+
equalTo(MESSAGING_OPERATION, "publish"),
196+
satisfies(
197+
MESSAGING_DESTINATION_PARTITION_ID,
198+
AbstractStringAssert::isNotEmpty),
199+
satisfies(
200+
MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
201+
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"),
202+
satisfies(
203+
MESSAGING_CLIENT_ID,
204+
stringAssert -> stringAssert.startsWith("producer"))));
205+
206+
producer.set(trace.getSpan(1));
207+
},
208+
trace ->
209+
trace.hasSpansSatisfyingExactly(
210+
receiveSpanAssert,
211+
span ->
212+
span.hasName("testSingleTopic process")
213+
.hasKind(SpanKind.CONSUMER)
214+
.hasParent(trace.getSpan(0))
215+
.hasLinks(LinkData.create(producer.get().getSpanContext()))
216+
.hasStatus(StatusData.error())
217+
.hasException(new IllegalArgumentException("boom"))
218+
.hasAttributesSatisfyingExactly(processAttributes),
219+
span -> span.hasName("consumer").hasParent(trace.getSpan(1)),
220+
span -> span.hasName("handle exception").hasParent(trace.getSpan(1)),
221+
span ->
222+
span.hasName("testSingleTopic process")
223+
.hasKind(SpanKind.CONSUMER)
224+
.hasParent(trace.getSpan(0))
225+
.hasLinks(LinkData.create(producer.get().getSpanContext()))
226+
.hasStatus(StatusData.error())
227+
.hasException(new IllegalArgumentException("boom"))
228+
.hasAttributesSatisfyingExactly(processAttributes),
229+
span -> span.hasName("consumer").hasParent(trace.getSpan(4)),
230+
span -> span.hasName("handle exception").hasParent(trace.getSpan(4)),
231+
span ->
232+
span.hasName("testSingleTopic process")
233+
.hasKind(SpanKind.CONSUMER)
234+
.hasParent(trace.getSpan(0))
235+
.hasLinks(LinkData.create(producer.get().getSpanContext()))
236+
.hasAttributesSatisfyingExactly(processAttributes),
237+
span -> span.hasName("consumer").hasParent(trace.getSpan(7))));
238+
239+
} else {
240+
testing.waitAndAssertSortedTraces(
241+
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
242+
trace -> {
229243
trace.hasSpansSatisfyingExactly(
230-
receiveSpanAssert,
244+
span -> span.hasName("producer"),
231245
span ->
232-
span.hasName("testSingleTopic process")
233-
.hasKind(SpanKind.CONSUMER)
246+
span.hasName("testSingleTopic publish")
247+
.hasKind(SpanKind.PRODUCER)
234248
.hasParent(trace.getSpan(0))
235-
.hasLinks(LinkData.create(producer.get().getSpanContext()))
236-
.hasAttributesSatisfyingExactly(processAttributes),
237-
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
249+
.hasAttributesSatisfyingExactly(
250+
equalTo(MESSAGING_SYSTEM, "kafka"),
251+
equalTo(MESSAGING_DESTINATION_NAME, "testSingleTopic"),
252+
equalTo(MESSAGING_OPERATION, "publish"),
253+
satisfies(
254+
MESSAGING_DESTINATION_PARTITION_ID,
255+
AbstractStringAssert::isNotEmpty),
256+
satisfies(
257+
MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
258+
equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"),
259+
satisfies(
260+
MESSAGING_CLIENT_ID,
261+
stringAssert -> stringAssert.startsWith("producer"))));
262+
263+
producer.set(trace.getSpan(1));
264+
},
265+
trace ->
266+
trace.hasSpansSatisfyingExactly(
267+
receiveSpanAssert,
268+
span ->
269+
span.hasName("testSingleTopic process")
270+
.hasKind(SpanKind.CONSUMER)
271+
.hasParent(trace.getSpan(0))
272+
.hasLinks(LinkData.create(producer.get().getSpanContext()))
273+
.hasStatus(StatusData.error())
274+
.hasException(new IllegalArgumentException("boom"))
275+
.hasAttributesSatisfyingExactly(processAttributes),
276+
span -> span.hasName("consumer").hasParent(trace.getSpan(1))),
277+
trace ->
278+
trace.hasSpansSatisfyingExactly(
279+
receiveSpanAssert,
280+
span ->
281+
span.hasName("testSingleTopic process")
282+
.hasKind(SpanKind.CONSUMER)
283+
.hasParent(trace.getSpan(0))
284+
.hasLinks(LinkData.create(producer.get().getSpanContext()))
285+
.hasStatus(StatusData.error())
286+
.hasException(new IllegalArgumentException("boom"))
287+
.hasAttributesSatisfyingExactly(processAttributes),
288+
span -> span.hasName("consumer").hasParent(trace.getSpan(1))),
289+
trace ->
290+
trace.hasSpansSatisfyingExactly(
291+
receiveSpanAssert,
292+
span ->
293+
span.hasName("testSingleTopic process")
294+
.hasKind(SpanKind.CONSUMER)
295+
.hasParent(trace.getSpan(0))
296+
.hasLinks(LinkData.create(producer.get().getSpanContext()))
297+
.hasAttributesSatisfyingExactly(processAttributes),
298+
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
299+
}
238300
}
239301

240302
@Test

instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,
2525

2626
private final Instrumenter<KafkaProcessRequest, Void> processInstrumenter;
2727
@Nullable private final RecordInterceptor<K, V> decorated;
28+
private static final ThreadLocal<ThreadState> threadLocalState = new ThreadLocal<>();
2829

2930
InstrumentedRecordInterceptor(
3031
Instrumenter<KafkaProcessRequest, Void> processInstrumenter,
@@ -74,7 +75,10 @@ public void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
7475
decorated.success(record, consumer);
7576
}
7677
} finally {
77-
end(record, null);
78+
// if thread state is present span is ended in afterRecord
79+
if (threadLocalState.get() == null) {
80+
end(record, null);
81+
}
7882
}
7983
}
8084

@@ -85,7 +89,13 @@ public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K
8589
decorated.failure(record, exception, consumer);
8690
}
8791
} finally {
88-
end(record, exception);
92+
// if thread state is present span is ended in afterRecord
93+
ThreadState threadState = threadLocalState.get();
94+
if (threadState == null) {
95+
end(record, exception);
96+
} else {
97+
threadState.error = exception;
98+
}
8999
}
90100
}
91101

@@ -102,6 +112,7 @@ private void end(ConsumerRecord<K, V> record, @Nullable Throwable error) {
102112
@NoMuzzle // method was added in 2.8.0
103113
@Override
104114
public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
115+
end(record, threadLocalState.get().error);
105116
if (decorated != null) {
106117
decorated.afterRecord(record, consumer);
107118
}
@@ -110,6 +121,7 @@ public void afterRecord(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
110121
@NoMuzzle // method was added in 2.8.0
111122
@Override
112123
public void setupThreadState(Consumer<?, ?> consumer) {
124+
threadLocalState.set(new ThreadState());
113125
if (decorated != null) {
114126
decorated.setupThreadState(consumer);
115127
}
@@ -118,8 +130,14 @@ public void setupThreadState(Consumer<?, ?> consumer) {
118130
@NoMuzzle // method was added in 2.8.0
119131
@Override
120132
public void clearThreadState(Consumer<?, ?> consumer) {
133+
threadLocalState.remove();
121134
if (decorated != null) {
122135
decorated.clearThreadState(consumer);
123136
}
124137
}
138+
139+
private static class ThreadState {
140+
// used to record the error in failure() so it could be used in afterRecord()
141+
Throwable error;
142+
}
125143
}

instrumentation/spring/spring-kafka-2.7/testing/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ dependencies {
66
implementation("io.opentelemetry.javaagent:opentelemetry-testing-common")
77
implementation("org.testcontainers:testcontainers-kafka")
88

9-
compileOnly("org.springframework.kafka:spring-kafka:2.7.0")
9+
compileOnly("org.springframework.kafka:spring-kafka:2.9.0")
1010
compileOnly("org.springframework.boot:spring-boot-starter-test:2.5.3")
1111
compileOnly("org.springframework.boot:spring-boot-starter:2.5.3")
1212
}

0 commit comments

Comments
 (0)