Skip to content

Commit 7bfe01a

Browse files
committed
Support for reactive result values from event listener methods
Closes gh-21831
1 parent 97d020c commit 7bfe01a

File tree

6 files changed

+169
-31
lines changed

6 files changed

+169
-31
lines changed

spring-context/spring-context.gradle

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ description = "Spring Context"
22

33
apply plugin: "groovy"
44

5+
dependencyManagement {
6+
imports {
7+
mavenBom "io.projectreactor:reactor-bom:${reactorVersion}"
8+
}
9+
}
10+
511
dependencies {
612
compile(project(":spring-aop"))
713
compile(project(":spring-beans"))
@@ -23,6 +29,8 @@ dependencies {
2329
optional("org.hibernate:hibernate-validator:5.4.3.Final")
2430
optional("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
2531
optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
32+
optional("org.reactivestreams:reactive-streams")
33+
testCompile("io.projectreactor:reactor-core")
2634
testCompile("org.codehaus.groovy:groovy-jsr223:${groovyVersion}")
2735
testCompile("org.codehaus.groovy:groovy-test:${groovyVersion}")
2836
testCompile("org.codehaus.groovy:groovy-xml:${groovyVersion}")

spring-context/src/main/java/org/springframework/context/event/ApplicationListenerMethodAdapter.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,30 +24,37 @@
2424
import java.util.Collection;
2525
import java.util.Collections;
2626
import java.util.List;
27+
import java.util.concurrent.CompletionStage;
2728

2829
import org.apache.commons.logging.Log;
2930
import org.apache.commons.logging.LogFactory;
31+
import org.reactivestreams.Subscriber;
32+
import org.reactivestreams.Subscription;
3033

3134
import org.springframework.aop.support.AopUtils;
3235
import org.springframework.context.ApplicationContext;
3336
import org.springframework.context.ApplicationEvent;
3437
import org.springframework.context.PayloadApplicationEvent;
3538
import org.springframework.context.expression.AnnotatedElementKey;
3639
import org.springframework.core.BridgeMethodResolver;
40+
import org.springframework.core.ReactiveAdapter;
41+
import org.springframework.core.ReactiveAdapterRegistry;
3742
import org.springframework.core.ResolvableType;
3843
import org.springframework.core.annotation.AnnotatedElementUtils;
3944
import org.springframework.core.annotation.Order;
4045
import org.springframework.lang.Nullable;
4146
import org.springframework.util.Assert;
47+
import org.springframework.util.ClassUtils;
4248
import org.springframework.util.ObjectUtils;
4349
import org.springframework.util.ReflectionUtils;
4450
import org.springframework.util.StringUtils;
51+
import org.springframework.util.concurrent.ListenableFuture;
4552

4653
/**
4754
* {@link GenericApplicationListener} adapter that delegates the processing of
4855
* an event to an {@link EventListener} annotated method.
4956
*
50-
* <p>Delegates to {@link #processEvent(ApplicationEvent)} to give sub-classes
57+
* <p>Delegates to {@link #processEvent(ApplicationEvent)} to give subclasses
5158
* a chance to deviate from the default. Unwraps the content of a
5259
* {@link PayloadApplicationEvent} if necessary to allow a method declaration
5360
* to define any arbitrary event type. If a condition is defined, it is
@@ -60,6 +67,10 @@
6067
*/
6168
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {
6269

70+
private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
71+
"org.reactivestreams.Publisher", ApplicationListenerMethodAdapter.class.getClassLoader());
72+
73+
6374
protected final Log logger = LogFactory.getLog(getClass());
6475

6576
private final String beanName;
@@ -213,6 +224,30 @@ protected Object[] resolveArguments(ApplicationEvent event) {
213224
}
214225

215226
protected void handleResult(Object result) {
227+
if (reactiveStreamsPresent && new ReactiveResultHandler().subscribeToPublisher(result)) {
228+
if (logger.isTraceEnabled()) {
229+
logger.trace("Adapted to reactive result: " + result);
230+
}
231+
}
232+
else if (result instanceof CompletionStage) {
233+
((CompletionStage<?>) result).whenComplete((event, ex) -> {
234+
if (ex != null) {
235+
handleAsyncError(ex);
236+
}
237+
else if (event != null) {
238+
publishEvent(event);
239+
}
240+
});
241+
}
242+
else if (result instanceof ListenableFuture) {
243+
((ListenableFuture<?>) result).addCallback(this::publishEvents, this::handleAsyncError);
244+
}
245+
else {
246+
publishEvents(result);
247+
}
248+
}
249+
250+
private void publishEvents(Object result) {
216251
if (result.getClass().isArray()) {
217252
Object[] events = ObjectUtils.toObjectArray(result);
218253
for (Object event : events) {
@@ -237,6 +272,10 @@ private void publishEvent(@Nullable Object event) {
237272
}
238273
}
239274

275+
protected void handleAsyncError(Throwable t) {
276+
logger.error("Unexpected error occurred in asynchronous listener", t);
277+
}
278+
240279
private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
241280
if (args == null) {
242281
return false;
@@ -376,4 +415,40 @@ public String toString() {
376415
return this.method.toGenericString();
377416
}
378417

418+
419+
private class ReactiveResultHandler {
420+
421+
public boolean subscribeToPublisher(Object result) {
422+
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(result.getClass());
423+
if (adapter != null) {
424+
adapter.toPublisher(result).subscribe(new EventPublicationSubscriber());
425+
return true;
426+
}
427+
return false;
428+
}
429+
}
430+
431+
432+
private class EventPublicationSubscriber implements Subscriber<Object> {
433+
434+
@Override
435+
public void onSubscribe(Subscription s) {
436+
s.request(Integer.MAX_VALUE);
437+
}
438+
439+
@Override
440+
public void onNext(Object o) {
441+
publishEvents(o);
442+
}
443+
444+
@Override
445+
public void onError(Throwable t) {
446+
handleAsyncError(t);
447+
}
448+
449+
@Override
450+
public void onComplete() {
451+
}
452+
}
453+
379454
}

spring-context/src/main/java/org/springframework/scheduling/support/TaskUtils.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -92,9 +92,7 @@ private static class LoggingErrorHandler implements ErrorHandler {
9292

9393
@Override
9494
public void handleError(Throwable t) {
95-
if (logger.isErrorEnabled()) {
96-
logger.error("Unexpected error occurred in scheduled task.", t);
97-
}
95+
logger.error("Unexpected error occurred in scheduled task", t);
9896
}
9997
}
10098

spring-context/src/test/java/org/springframework/context/event/AnnotationDrivenEventListenerTests.java

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@
2525
import java.util.LinkedHashSet;
2626
import java.util.List;
2727
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.CountDownLatch;
2930
import java.util.concurrent.TimeUnit;
3031
import javax.annotation.PostConstruct;
3132

3233
import org.junit.After;
3334
import org.junit.Ignore;
3435
import org.junit.Test;
36+
import reactor.core.publisher.Flux;
37+
import reactor.core.publisher.Mono;
3538

3639
import org.springframework.aop.framework.Advised;
3740
import org.springframework.aop.support.AopUtils;
@@ -61,6 +64,7 @@
6164
import org.springframework.scheduling.annotation.EnableAsync;
6265
import org.springframework.stereotype.Component;
6366
import org.springframework.util.Assert;
67+
import org.springframework.util.concurrent.SettableListenableFuture;
6468
import org.springframework.validation.annotation.Validated;
6569
import org.springframework.validation.beanvalidation.MethodValidationPostProcessor;
6670

@@ -243,7 +247,69 @@ public void collectionReplyNullValue() {
243247
}
244248

245249
@Test
246-
public void eventListenerWorksWithSimpleInterfaceProxy() throws Exception {
250+
public void listenableFutureReply() {
251+
load(TestEventListener.class, ReplyEventListener.class);
252+
SettableListenableFuture<String> future = new SettableListenableFuture<>();
253+
future.set("dummy");
254+
AnotherTestEvent event = new AnotherTestEvent(this, future);
255+
ReplyEventListener replyEventListener = this.context.getBean(ReplyEventListener.class);
256+
TestEventListener listener = this.context.getBean(TestEventListener.class);
257+
258+
this.eventCollector.assertNoEventReceived(listener);
259+
this.eventCollector.assertNoEventReceived(replyEventListener);
260+
this.context.publishEvent(event);
261+
this.eventCollector.assertEvent(replyEventListener, event);
262+
this.eventCollector.assertEvent(listener, "dummy"); // reply
263+
this.eventCollector.assertTotalEventsCount(2);
264+
}
265+
266+
@Test
267+
public void completableFutureReply() {
268+
load(TestEventListener.class, ReplyEventListener.class);
269+
AnotherTestEvent event = new AnotherTestEvent(this, CompletableFuture.completedFuture("dummy"));
270+
ReplyEventListener replyEventListener = this.context.getBean(ReplyEventListener.class);
271+
TestEventListener listener = this.context.getBean(TestEventListener.class);
272+
273+
this.eventCollector.assertNoEventReceived(listener);
274+
this.eventCollector.assertNoEventReceived(replyEventListener);
275+
this.context.publishEvent(event);
276+
this.eventCollector.assertEvent(replyEventListener, event);
277+
this.eventCollector.assertEvent(listener, "dummy"); // reply
278+
this.eventCollector.assertTotalEventsCount(2);
279+
}
280+
281+
@Test
282+
public void monoReply() {
283+
load(TestEventListener.class, ReplyEventListener.class);
284+
AnotherTestEvent event = new AnotherTestEvent(this, Mono.just("dummy"));
285+
ReplyEventListener replyEventListener = this.context.getBean(ReplyEventListener.class);
286+
TestEventListener listener = this.context.getBean(TestEventListener.class);
287+
288+
this.eventCollector.assertNoEventReceived(listener);
289+
this.eventCollector.assertNoEventReceived(replyEventListener);
290+
this.context.publishEvent(event);
291+
this.eventCollector.assertEvent(replyEventListener, event);
292+
this.eventCollector.assertEvent(listener, "dummy"); // reply
293+
this.eventCollector.assertTotalEventsCount(2);
294+
}
295+
296+
@Test
297+
public void fluxReply() {
298+
load(TestEventListener.class, ReplyEventListener.class);
299+
AnotherTestEvent event = new AnotherTestEvent(this, Flux.just("dummy1", "dummy2"));
300+
ReplyEventListener replyEventListener = this.context.getBean(ReplyEventListener.class);
301+
TestEventListener listener = this.context.getBean(TestEventListener.class);
302+
303+
this.eventCollector.assertNoEventReceived(listener);
304+
this.eventCollector.assertNoEventReceived(replyEventListener);
305+
this.context.publishEvent(event);
306+
this.eventCollector.assertEvent(replyEventListener, event);
307+
this.eventCollector.assertEvent(listener, "dummy1", "dummy2"); // reply
308+
this.eventCollector.assertTotalEventsCount(3);
309+
}
310+
311+
@Test
312+
public void eventListenerWorksWithSimpleInterfaceProxy() {
247313
load(ScopedProxyTestBean.class);
248314

249315
SimpleService proxy = this.context.getBean(SimpleService.class);
@@ -260,7 +326,7 @@ public void eventListenerWorksWithSimpleInterfaceProxy() throws Exception {
260326
}
261327

262328
@Test
263-
public void eventListenerWorksWithAnnotatedInterfaceProxy() throws Exception {
329+
public void eventListenerWorksWithAnnotatedInterfaceProxy() {
264330
load(AnnotatedProxyTestBean.class);
265331

266332
AnnotatedSimpleService proxy = this.context.getBean(AnnotatedSimpleService.class);
@@ -277,7 +343,7 @@ public void eventListenerWorksWithAnnotatedInterfaceProxy() throws Exception {
277343
}
278344

279345
@Test
280-
public void eventListenerWorksWithCglibProxy() throws Exception {
346+
public void eventListenerWorksWithCglibProxy() {
281347
load(CglibProxyTestBean.class);
282348

283349
CglibProxyTestBean proxy = this.context.getBean(CglibProxyTestBean.class);
@@ -294,14 +360,14 @@ public void eventListenerWorksWithCglibProxy() throws Exception {
294360
}
295361

296362
@Test
297-
public void privateMethodOnCglibProxyFails() throws Exception {
363+
public void privateMethodOnCglibProxyFails() {
298364
assertThatExceptionOfType(BeanInitializationException.class).isThrownBy(() ->
299365
load(CglibProxyWithPrivateMethod.class))
300366
.withCauseInstanceOf(IllegalStateException.class);
301367
}
302368

303369
@Test
304-
public void eventListenerWorksWithCustomScope() throws Exception {
370+
public void eventListenerWorksWithCustomScope() {
305371
load(CustomScopeTestBean.class);
306372
CustomScope customScope = new CustomScope();
307373
this.context.getBeanFactory().registerScope("custom", customScope);

spring-context/src/test/java/org/springframework/context/event/ApplicationListenerMethodAdapterTests.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,7 @@ public void listenerWithPayloadTypeErasure() { // Always accept such event when
9999

100100
@Test
101101
public void listenerWithSubTypeSeveralGenerics() {
102-
Method method = ReflectionUtils.findMethod(SampleEvents.class,
103-
"handleString", String.class);
102+
Method method = ReflectionUtils.findMethod(SampleEvents.class, "handleString", String.class);
104103
supportsEventType(true, method, ResolvableType.forClass(PayloadTestEvent.class));
105104
}
106105

@@ -141,23 +140,20 @@ public void listenerWithSeveralTypes() {
141140
public void listenerWithTooManyParameters() {
142141
Method method = ReflectionUtils.findMethod(
143142
SampleEvents.class, "tooManyParameters", String.class, String.class);
144-
assertThatIllegalStateException().isThrownBy(() ->
145-
createTestInstance(method));
143+
assertThatIllegalStateException().isThrownBy(() -> createTestInstance(method));
146144
}
147145

148146
@Test
149147
public void listenerWithNoParameter() {
150148
Method method = ReflectionUtils.findMethod(SampleEvents.class, "noParameter");
151-
assertThatIllegalStateException().isThrownBy(() ->
152-
createTestInstance(method));
149+
assertThatIllegalStateException().isThrownBy(() -> createTestInstance(method));
153150
}
154151

155152
@Test
156153
public void listenerWithMoreThanOneParameter() {
157154
Method method = ReflectionUtils.findMethod(
158155
SampleEvents.class, "moreThanOneParameter", String.class, Integer.class);
159-
assertThatIllegalStateException().isThrownBy(() ->
160-
createTestInstance(method));
156+
assertThatIllegalStateException().isThrownBy(() -> createTestInstance(method));
161157
}
162158

163159
@Test
@@ -331,7 +327,8 @@ public void beanInstanceRetrievedAtEveryInvocation() {
331327

332328
private void supportsEventType(boolean match, Method method, ResolvableType eventType) {
333329
ApplicationListenerMethodAdapter adapter = createTestInstance(method);
334-
assertThat(adapter.supportsEventType(eventType)).as("Wrong match for event '" + eventType + "' on " + method).isEqualTo(match);
330+
assertThat(adapter.supportsEventType(eventType))
331+
.as("Wrong match for event '" + eventType + "' on " + method).isEqualTo(match);
335332
}
336333

337334
private void invokeListener(Method method, ApplicationEvent event) {

0 commit comments

Comments
 (0)