Skip to content

Commit 8afa3e8

Browse files
nkavturmarcingrzejszczak
authored andcommitted
Fix ThreadPoolTaskScheduler proxy mechanism (#1447)
1 parent bee61a9 commit 8afa3e8

File tree

2 files changed

+54
-8
lines changed

2 files changed

+54
-8
lines changed

spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/async/ExecutorBeanPostProcessor.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,13 @@ Object createExecutorServiceProxy(Object bean, boolean cglibProxy,
185185

186186
Object createAsyncTaskExecutorProxy(Object bean, boolean cglibProxy,
187187
AsyncTaskExecutor executor) {
188-
return getProxiedObject(bean, cglibProxy, executor,
189-
() -> new LazyTraceAsyncTaskExecutor(this.beanFactory, executor));
188+
return getProxiedObject(bean, cglibProxy, executor, () -> {
189+
if (bean instanceof ThreadPoolTaskScheduler) {
190+
return new LazyTraceThreadPoolTaskScheduler(this.beanFactory,
191+
(ThreadPoolTaskScheduler) executor);
192+
}
193+
return new LazyTraceAsyncTaskExecutor(this.beanFactory, executor);
194+
});
190195
}
191196

192197
private Object getProxiedObject(Object bean, boolean cglibProxy, Executor executor,

spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/async/issues/issue410/Issue410Tests.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.cloud.sleuth.instrument.async.issues.issue410;
1818

1919
import java.lang.invoke.MethodHandles;
20+
import java.util.Date;
2021
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.ExecutionException;
2223
import java.util.concurrent.Executor;
@@ -182,12 +183,35 @@ public void should_pass_tracing_info_for_completable_futures_with_task_scheduler
182183
* Related to issue #1232
183184
*/
184185
@Test
185-
public void should_pass_tracing_info_for_completable_futures_with_threadPoolTaskScheduler() {
186+
public void should_pass_tracing_info_for_submitted_tasks_with_threadPoolTaskScheduler() {
186187
Span span = this.tracer.nextSpan().name("foo");
187188
log.info("Starting test");
188189
try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(span)) {
189190
String response = this.restTemplate.getForObject(
190-
"http://localhost:" + port() + "/threadPoolTaskScheduler",
191+
"http://localhost:" + port() + "/threadPoolTaskScheduler_submit",
192+
String.class);
193+
194+
then(response).isEqualTo(span.context().traceIdString());
195+
Awaitility.await().untilAsserted(() -> {
196+
then(this.asyncTask.getSpan().get()).isNotNull();
197+
then(this.asyncTask.getSpan().get().context().traceId())
198+
.isEqualTo(span.context().traceId());
199+
});
200+
}
201+
finally {
202+
span.finish();
203+
}
204+
205+
then(this.tracer.currentSpan()).isNull();
206+
}
207+
208+
@Test
209+
public void should_pass_tracing_info_for_scheduled_tasks_with_threadPoolTaskScheduler() {
210+
Span span = this.tracer.nextSpan().name("foo");
211+
log.info("Starting test");
212+
try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(span)) {
213+
String response = this.restTemplate.getForObject(
214+
"http://localhost:" + port() + "/threadPoolTaskScheduler_schedule",
191215
String.class);
192216

193217
then(response).isEqualTo(span.context().traceIdString());
@@ -372,7 +396,7 @@ public Span scheduledThreadPoolExecutor()
372396
return this.span.get();
373397
}
374398

375-
public Span threadPoolTaskScheduler()
399+
public Span threadPoolTaskSchedulerSubmit()
376400
throws ExecutionException, InterruptedException {
377401
log.info("This task is running with ThreadPoolTaskScheduler");
378402
this.threadPoolTaskScheduler.submit(() -> {
@@ -382,6 +406,16 @@ public Span threadPoolTaskScheduler()
382406
return this.span.get();
383407
}
384408

409+
public Span threadPoolTaskSchedulerSchedule()
410+
throws ExecutionException, InterruptedException {
411+
log.info("This task is running with ThreadPoolTaskScheduler");
412+
this.threadPoolTaskScheduler.schedule(() -> {
413+
log.info("Hello from runnable");
414+
AsyncTask.this.span.set(AsyncTask.this.tracer.currentSpan());
415+
}, new Date()).get();
416+
return this.span.get();
417+
}
418+
385419
public AtomicReference<Span> getSpan() {
386420
return this.span;
387421
}
@@ -427,11 +461,18 @@ public String taskScheduler() throws ExecutionException, InterruptedException {
427461
return this.asyncTask.taskScheduler().context().traceIdString();
428462
}
429463

430-
@RequestMapping("/threadPoolTaskScheduler")
431-
public String threadPoolTaskScheduler()
464+
@RequestMapping("/threadPoolTaskScheduler_submit")
465+
public String threadPoolTaskSchedulerSubmit()
466+
throws ExecutionException, InterruptedException {
467+
log.info("Executing completable via ThreadPoolTaskScheduler");
468+
return this.asyncTask.threadPoolTaskSchedulerSubmit().context().traceIdString();
469+
}
470+
471+
@RequestMapping("/threadPoolTaskScheduler_schedule")
472+
public String threadPoolTaskSchedulerSchedule()
432473
throws ExecutionException, InterruptedException {
433474
log.info("Executing completable via ThreadPoolTaskScheduler");
434-
return this.asyncTask.threadPoolTaskScheduler().context().traceIdString();
475+
return this.asyncTask.threadPoolTaskSchedulerSchedule().context().traceIdString();
435476
}
436477

437478
@RequestMapping("/scheduledThreadPoolExecutor")

0 commit comments

Comments
 (0)