Skip to content

Commit f5a356c

Browse files
committed
Add ContextPropagatingTaskDecorator
Prior to this commit, `@Async` and `@EventListener` annotated methods would lose the the logging and observation contexts whenever their execution was scheduled on a different Thread. The Context Propagation library supports this use case and can propagate context values in ThreadLocals, Reactor Context and more. This commit introduces a new `TaskDecorator` implementation that leverages the Context Propagation library. When configured on a `TaskExecutor`, this allows to properly propagate context value through the execution of the task. This implementation is completely optional and requires the "io.micrometer:context-propagation" library on the classpath. Enabling this feature must be done consciously and sometimes selectively, as context propagation introduces some overhead. Closes gh-31130
1 parent d47c7f9 commit f5a356c

File tree

9 files changed

+347
-14
lines changed

9 files changed

+347
-14
lines changed

framework-docs/modules/ROOT/pages/core/beans/context-introduction.adoc

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -478,20 +478,21 @@ Kotlin::
478478
----
479479
======
480480

481-
Notice that `ApplicationListener` is generically parameterized with the type of your
482-
custom event (`BlockedListEvent` in the preceding example). This means that the
483-
`onApplicationEvent()` method can remain type-safe, avoiding any need for downcasting.
484-
You can register as many event listeners as you wish, but note that, by default, event
485-
listeners receive events synchronously. This means that the `publishEvent()` method
486-
blocks until all listeners have finished processing the event. One advantage of this
487-
synchronous and single-threaded approach is that, when a listener receives an event,
488-
it operates inside the transaction context of the publisher if a transaction context
489-
is available. If another strategy for event publication becomes necessary, e.g.
490-
asynchronous event processing by default, see the javadoc for Spring's
491-
{api-spring-framework}/context/event/ApplicationEventMulticaster.html[`ApplicationEventMulticaster`] interface
492-
and {api-spring-framework}/context/event/SimpleApplicationEventMulticaster.html[`SimpleApplicationEventMulticaster`]
493-
implementation for configuration options which can be applied to a custom
494-
"applicationEventMulticaster" bean definition.
481+
Notice that `ApplicationListener` is generically parameterized with the type of your custom event (`BlockedListEvent` in the preceding example).
482+
This means that the `onApplicationEvent()` method can remain type-safe, avoiding any need for downcasting.
483+
You can register as many event listeners as you wish, but note that, by default, event listeners receive events synchronously.
484+
This means that the `publishEvent()` method blocks until all listeners have finished processing the event.
485+
One advantage of this synchronous and single-threaded approach is that, when a listener receives an event,
486+
it operates inside the transaction context of the publisher if a transaction context is available.
487+
If another strategy for event publication becomes necessary, e.g. asynchronous event processing by default,
488+
see the javadoc for Spring's {api-spring-framework}/context/event/ApplicationEventMulticaster.html[`ApplicationEventMulticaster`] interface
489+
and {api-spring-framework}/context/event/SimpleApplicationEventMulticaster.html[`SimpleApplicationEventMulticaster`] implementation
490+
for configuration options which can be applied to a custom "applicationEventMulticaster" bean definition.
491+
In these cases, ThreadLocals and logging context are not propagated for the event processing.
492+
See xref:integration/observability.adoc#observability.application-events[the `@EventListener` Observability section]
493+
for more information on Observability concerns.
494+
495+
495496

496497
The following example shows the bean definitions used to register and configure each of
497498
the classes above:
@@ -747,6 +748,9 @@ Be aware of the following limitations when using asynchronous events:
747748
value. If you need to publish another event as the result of the processing, inject an
748749
{api-spring-framework}/context/ApplicationEventPublisher.html[`ApplicationEventPublisher`]
749750
to publish the event manually.
751+
* ThreadLocals and logging context are not propagated by default for the event processing.
752+
See xref:integration/observability.adoc#observability.application-events[the `@EventListener` Observability section]
753+
for more information on Observability concerns.
750754

751755

752756
[[context-functionality-events-order]]

framework-docs/modules/ROOT/pages/integration/observability.adoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,3 +349,28 @@ Instrumentation uses the `org.springframework.web.reactive.function.client.Clien
349349
|===
350350

351351

352+
[[observability.application-events]]
353+
== Application Events and `@EventListener`
354+
355+
Spring Framework does not contribute Observations for xref:core/beans/context-introduction.adoc#context-functionality-events-annotation[`@EventListener` calls],
356+
as they don't have the right semantics for such instrumentation.
357+
By default, event publication and processing is done synchronously and on the same Thread.
358+
This means that during the execution of that task, the ThreadLocals and logging context will be the same as the event publisher.
359+
360+
If the application configures globally a custom `ApplicationEventMulticaster` with a strategy that schedules event processing on different threads, this is no longer true.
361+
All `@EventListener` methods will be processed on a different thread, outstide of the main event publication thread.
362+
In these cases, the https://micrometer.io/docs/contextPropagation[Micrometer Context Propagation library] can help propagating such values and better correlate the processing of the events.
363+
The application can configure the chosen `TaskExecutor` to use a `ContextPropagatingTaskDecorator` that decorates tasks and propagates context.
364+
For this to work, the `io.micrometer:context-propagation` library must be present on the classpath:
365+
366+
include-code::./ApplicationEventsConfiguration[]
367+
368+
Similarly, if that asynchronous choice is made locally for each `@EventListener` annotated method, by adding an `@Async` method to it,
369+
you can choose a `TaskExecutor` that propagates context by referring to it by its qualifier.
370+
Given the following `TaskExecutor` bean definition, configured with the dedicated task decorator:
371+
372+
include-code::./EventAsyncExecutionConfiguration[]
373+
374+
Annotating event listeners with `@Async` and the relevant qualifier will achieve similar context propagation results:
375+
376+
include-code::./EmailNotificationListener[]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.docs.integration.observability.applicationevents;
18+
19+
import org.springframework.context.annotation.Bean;
20+
import org.springframework.context.annotation.Configuration;
21+
import org.springframework.context.event.SimpleApplicationEventMulticaster;
22+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
23+
import org.springframework.core.task.support.ContextPropagatingTaskDecorator;
24+
25+
@Configuration
26+
public class ApplicationEventsConfiguration {
27+
28+
@Bean(name = "applicationEventMulticaster")
29+
public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster() {
30+
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
31+
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
32+
// decorate task execution with a decorator that supports context propagation
33+
taskExecutor.setTaskDecorator(new ContextPropagatingTaskDecorator());
34+
eventMulticaster.setTaskExecutor(taskExecutor);
35+
return eventMulticaster;
36+
}
37+
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.docs.integration.observability.applicationevents;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
22+
import org.springframework.context.event.EventListener;
23+
import org.springframework.scheduling.annotation.Async;
24+
import org.springframework.stereotype.Component;
25+
26+
@Component
27+
public class EmailNotificationListener {
28+
29+
private final Log logger = LogFactory.getLog(EmailNotificationListener.class);
30+
31+
@EventListener(EmailReceivedEvent.class)
32+
@Async("propagatingContextExecutor")
33+
public void emailReceived(EmailReceivedEvent event) {
34+
// asynchronously process the received event
35+
// this logging statement will contain the expected MDC entries from the propagated context
36+
logger.info("email has been received");
37+
}
38+
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.docs.integration.observability.applicationevents;
18+
19+
import org.springframework.context.ApplicationEvent;
20+
21+
@SuppressWarnings("serial")
22+
public class EmailReceivedEvent extends ApplicationEvent {
23+
24+
public EmailReceivedEvent(Object source) {
25+
super(source);
26+
}
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.docs.integration.observability.applicationevents;
18+
19+
import org.springframework.context.annotation.Bean;
20+
import org.springframework.context.annotation.Configuration;
21+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
22+
import org.springframework.core.task.TaskExecutor;
23+
import org.springframework.core.task.support.ContextPropagatingTaskDecorator;
24+
25+
@Configuration
26+
public class EventAsyncExecutionConfiguration {
27+
28+
@Bean(name = "propagatingContextExecutor")
29+
public TaskExecutor propagatingContextExecutor() {
30+
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
31+
// decorate task execution with a decorator that supports context propagation
32+
taskExecutor.setTaskDecorator(new ContextPropagatingTaskDecorator());
33+
return taskExecutor;
34+
}
35+
36+
}

spring-core/spring-core.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ dependencies {
7373
api(project(":spring-jcl"))
7474
compileOnly("io.projectreactor.tools:blockhound")
7575
compileOnly("org.graalvm.sdk:graal-sdk")
76+
optional("io.micrometer:context-propagation")
7677
optional("io.netty:netty-buffer")
7778
optional("io.netty:netty5-buffer")
7879
optional("io.projectreactor:reactor-core")
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core.task.support;
18+
19+
import io.micrometer.context.ContextSnapshot;
20+
import io.micrometer.context.ContextSnapshotFactory;
21+
22+
import org.springframework.core.task.TaskDecorator;
23+
24+
/**
25+
* {@link TaskDecorator} that {@link ContextSnapshot#wrap(Runnable) wrap the execution} of
26+
* tasks, assisting with context propagation.
27+
* <p>This operation is only useful when the task execution is scheduled on a different
28+
* thread than the original call stack; this depends on the choice of
29+
* {@link org.springframework.core.task.TaskExecutor}. This is particularly useful for
30+
* restoring a logging context or an observation context for the task execution. Note that
31+
* this decorator will cause some overhead for task execution and is not recommended for
32+
* applications that run lots of very small tasks.
33+
*
34+
* @author Brian Clozel
35+
* @since 6.1
36+
* @see CompositeTaskDecorator
37+
*/
38+
public class ContextPropagatingTaskDecorator implements TaskDecorator {
39+
40+
private final ContextSnapshotFactory factory;
41+
42+
/**
43+
* Create a new decorator that uses a default instance of the {@link ContextSnapshotFactory}.
44+
*/
45+
public ContextPropagatingTaskDecorator() {
46+
this(ContextSnapshotFactory.builder().build());
47+
}
48+
49+
/**
50+
* Create a new decorator using the given {@link ContextSnapshotFactory}.
51+
* @param factory the context snapshot factory to use.
52+
*/
53+
public ContextPropagatingTaskDecorator(ContextSnapshotFactory factory) {
54+
this.factory = factory;
55+
}
56+
57+
@Override
58+
public Runnable decorate(Runnable runnable) {
59+
return this.factory.captureAll().wrap(runnable);
60+
}
61+
62+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core.task.support;
18+
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import io.micrometer.context.ContextRegistry;
22+
import io.micrometer.context.ContextSnapshotFactory;
23+
import io.micrometer.context.ThreadLocalAccessor;
24+
import org.junit.jupiter.api.Test;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
28+
/**
29+
* Tests for {@link ContextPropagatingTaskDecorator}.
30+
* @author Brian Clozel
31+
*/
32+
class ContextPropagatingTaskDecoratorTests {
33+
34+
@Test
35+
void shouldPropagateContextInTaskExecution() throws Exception {
36+
AtomicReference<String> actual = new AtomicReference<>("");
37+
ContextRegistry registry = new ContextRegistry();
38+
registry.registerThreadLocalAccessor(new TestThreadLocalAccessor());
39+
ContextSnapshotFactory snapshotFactory = ContextSnapshotFactory.builder().contextRegistry(registry).build();
40+
41+
Runnable task = () -> actual.set(TestThreadLocalHolder.getValue());
42+
TestThreadLocalHolder.setValue("expected");
43+
44+
Thread execution = new Thread(new ContextPropagatingTaskDecorator(snapshotFactory).decorate(task));
45+
execution.start();
46+
execution.join();
47+
assertThat(actual.get()).isEqualTo("expected");
48+
TestThreadLocalHolder.reset();
49+
}
50+
51+
static class TestThreadLocalHolder {
52+
53+
private static final ThreadLocal<String> holder = new ThreadLocal<>();
54+
55+
public static void setValue(String value) {
56+
holder.set(value);
57+
}
58+
59+
public static String getValue() {
60+
return holder.get();
61+
}
62+
63+
public static void reset() {
64+
holder.remove();
65+
}
66+
67+
}
68+
69+
static class TestThreadLocalAccessor implements ThreadLocalAccessor<String> {
70+
71+
public static final String KEY = "test.threadlocal";
72+
73+
@Override
74+
public Object key() {
75+
return KEY;
76+
}
77+
78+
@Override
79+
public String getValue() {
80+
return TestThreadLocalHolder.getValue();
81+
}
82+
83+
@Override
84+
public void setValue(String value) {
85+
TestThreadLocalHolder.setValue(value);
86+
}
87+
88+
@Override
89+
public void setValue() {
90+
TestThreadLocalHolder.reset();
91+
}
92+
93+
@Override
94+
public void restore(String previousValue) {
95+
setValue(previousValue);
96+
}
97+
98+
}
99+
100+
}

0 commit comments

Comments
 (0)