Skip to content

Commit 791f729

Browse files
authored
Adds tracing on every factory methods in KafkaListenerContainerFactory (#1665)
1 parent a83621e commit 791f729

File tree

2 files changed

+103
-1
lines changed

2 files changed

+103
-1
lines changed

spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceMessagingAutoConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ private void anyConsumerFactory() {
302302
private void anyCreateListenerContainer() {
303303
} // NOSONAR
304304

305+
@Pointcut("execution(public * org.springframework.kafka.config.KafkaListenerContainerFactory.createContainer(..))")
306+
private void anyCreateContainer() {
307+
} // NOSONAR
308+
305309
@Around("anyProducerFactory()")
306310
public Object wrapProducerFactory(ProceedingJoinPoint pjp) throws Throwable {
307311
Producer producer = (Producer) pjp.proceed();
@@ -314,7 +318,7 @@ public Object wrapConsumerFactory(ProceedingJoinPoint pjp) throws Throwable {
314318
return this.kafkaTracing.consumer(consumer);
315319
}
316320

317-
@Around("anyCreateListenerContainer()")
321+
@Around("anyCreateListenerContainer() || anyCreateContainer()")
318322
public Object wrapListenerContainerCreation(ProceedingJoinPoint pjp)
319323
throws Throwable {
320324
MessageListenerContainer listener = (MessageListenerContainer) pjp.proceed();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2013-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.instrument.messaging;
18+
19+
import brave.Tracer;
20+
import brave.handler.SpanHandler;
21+
import brave.kafka.clients.KafkaTracing;
22+
import brave.sampler.Sampler;
23+
import brave.test.TestSpanHandler;
24+
import org.aspectj.lang.ProceedingJoinPoint;
25+
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
import org.mockito.Mockito;
28+
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
31+
import org.springframework.boot.test.context.SpringBootTest;
32+
import org.springframework.context.annotation.Bean;
33+
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
35+
import org.springframework.kafka.listener.MessageListenerContainer;
36+
import org.springframework.test.context.junit4.SpringRunner;
37+
38+
import static org.assertj.core.api.BDDAssertions.then;
39+
40+
/**
41+
* @author Roberto Tassi
42+
*/
43+
@RunWith(SpringRunner.class)
44+
@SpringBootTest(classes = TraceMessagingAutoConfiguration1664Tests.Config.class,
45+
webEnvironment = SpringBootTest.WebEnvironment.NONE)
46+
public class TraceMessagingAutoConfiguration1664Tests {
47+
48+
@Autowired
49+
MySleuthKafka1664Aspect mySleuthKafka1664Aspect;
50+
51+
@Autowired
52+
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;
53+
54+
@Test
55+
public void should_wrap_kafka() {
56+
kafkaListenerContainerFactory.createContainer("backend");
57+
then(this.mySleuthKafka1664Aspect.adapterWrapped).isTrue();
58+
}
59+
60+
@Configuration
61+
@EnableAutoConfiguration
62+
protected static class Config {
63+
64+
@Bean
65+
Sampler sampler() {
66+
return Sampler.ALWAYS_SAMPLE;
67+
}
68+
69+
@Bean
70+
SpanHandler testSpanHandler() {
71+
return new TestSpanHandler();
72+
}
73+
74+
@Bean
75+
SleuthKafkaAspect sleuthKafkaAspect(KafkaTracing kafkaTracing, Tracer tracer) {
76+
return new MySleuthKafka1664Aspect(kafkaTracing, tracer);
77+
}
78+
79+
}
80+
81+
}
82+
83+
class MySleuthKafka1664Aspect extends SleuthKafkaAspect {
84+
85+
boolean adapterWrapped;
86+
87+
MySleuthKafka1664Aspect(KafkaTracing kafkaTracing, Tracer tracer) {
88+
super(kafkaTracing, tracer);
89+
}
90+
91+
@Override
92+
public Object wrapListenerContainerCreation(ProceedingJoinPoint pjp)
93+
throws Throwable {
94+
this.adapterWrapped = true;
95+
return Mockito.mock(MessageListenerContainer.class);
96+
}
97+
98+
}

0 commit comments

Comments
 (0)