Skip to content

Commit 4e25a14

Browse files
author
Juergen Hoeller
committed
Merge pull request #571 from sdeleuze/SPR-11820
Make ListenableFuture compliant with Java 8 lambda
2 parents b1b7f8a + 86e8bda commit 4e25a14

File tree

14 files changed

+392
-41
lines changed

14 files changed

+392
-41
lines changed

spring-context/src/main/java/org/springframework/scheduling/annotation/AsyncResult.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import java.util.concurrent.TimeUnit;
2020

21+
import org.springframework.util.concurrent.FailureCallback;
2122
import org.springframework.util.concurrent.ListenableFuture;
2223
import org.springframework.util.concurrent.ListenableFutureCallback;
24+
import org.springframework.util.concurrent.SuccessCallback;
2325

2426
/**
2527
* A pass-through {@code Future} handle that can be used for method signatures
@@ -74,7 +76,15 @@ public V get(long timeout, TimeUnit unit) {
7476

7577
@Override
7678
public void addCallback(ListenableFutureCallback<? super V> callback) {
77-
callback.onSuccess(this.value);
79+
addCallback(callback, callback);
7880
}
7981

82+
@Override
83+
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
84+
try {
85+
successCallback.onSuccess(this.value);
86+
} catch(Throwable t) {
87+
failureCallback.onFailure(t);
88+
}
89+
}
8090
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2002-2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.util.concurrent;
18+
19+
/**
20+
* Defines the contract for failure callbacks that accept the result of a
21+
* {@link ListenableFuture}.
22+
*
23+
* @author Sebastien Deleuze
24+
* @since 4.1
25+
*/
26+
public interface FailureCallback {
27+
28+
/**
29+
* Called when the {@link ListenableFuture} fails to complete.
30+
* @param t the exception that triggered the failure
31+
*/
32+
void onFailure(Throwable t);
33+
34+
}

spring-core/src/main/java/org/springframework/util/concurrent/ListenableFuture.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
* <p>Inspired by {@code com.google.common.util.concurrent.ListenableFuture}.
2626
2727
* @author Arjen Poutsma
28+
* @author Sebastien Deleuze
2829
* @since 4.0
2930
*/
3031
public interface ListenableFuture<T> extends Future<T> {
@@ -37,4 +38,15 @@ public interface ListenableFuture<T> extends Future<T> {
3738
*/
3839
void addCallback(ListenableFutureCallback<? super T> callback);
3940

41+
/**
42+
* Registers the given success and failure callbacks to this {@code ListenableFuture}.
43+
* The callback will be triggered when this {@code Future} is complete or, if it is
44+
* already complete immediately. This is a Java 8 lambdas compliant alternative to
45+
* {@link #addCallback(ListenableFutureCallback)}.
46+
* @param successCallback the success callback to register
47+
* @param failureCallback the failure callback to register
48+
* @since 4.1
49+
*/
50+
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
51+
4052
}

spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureAdapter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,12 +44,18 @@ protected ListenableFutureAdapter(ListenableFuture<S> adaptee) {
4444

4545
@Override
4646
public void addCallback(final ListenableFutureCallback<? super T> callback) {
47+
addCallback(callback, callback);
48+
}
49+
50+
@Override
51+
public void addCallback(final SuccessCallback<? super T> successCallback,
52+
final FailureCallback failureCallback) {
4753
ListenableFuture<S> listenableAdaptee = (ListenableFuture<S>) getAdaptee();
4854
listenableAdaptee.addCallback(new ListenableFutureCallback<S>() {
4955
@Override
5056
public void onSuccess(S result) {
5157
try {
52-
callback.onSuccess(adaptInternal(result));
58+
successCallback.onSuccess(adaptInternal(result));
5359
}
5460
catch (ExecutionException ex) {
5561
Throwable cause = ex.getCause();
@@ -62,8 +68,9 @@ public void onSuccess(S result) {
6268

6369
@Override
6470
public void onFailure(Throwable t) {
65-
callback.onFailure(t);
71+
failureCallback.onFailure(t);
6672
}
6773
});
6874
}
75+
6976
}
Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,20 +21,9 @@
2121
* {@link ListenableFuture}.
2222
*
2323
* @author Arjen Poutsma
24+
* @author Sebastien Deleuze
2425
* @since 4.0
2526
*/
26-
public interface ListenableFutureCallback<T> {
27-
28-
/**
29-
* Called when the {@link ListenableFuture} successfully completes.
30-
* @param result the result
31-
*/
32-
void onSuccess(T result);
33-
34-
/**
35-
* Called when the {@link ListenableFuture} fails to complete.
36-
* @param t the exception that triggered the failure
37-
*/
38-
void onFailure(Throwable t);
27+
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback {
3928

4029
}

spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureCallbackRegistry.java

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,12 +27,16 @@
2727
* <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}.
2828
*
2929
* @author Arjen Poutsma
30+
* @author Sebastien Deleuze
3031
* @since 4.0
3132
*/
3233
public class ListenableFutureCallbackRegistry<T> {
3334

34-
private final Queue<ListenableFutureCallback<? super T>> callbacks =
35-
new LinkedList<ListenableFutureCallback<? super T>>();
35+
private final Queue<SuccessCallback<? super T>> successCallbacks =
36+
new LinkedList<SuccessCallback<? super T>>();
37+
38+
private final Queue<FailureCallback> failureCallbacks =
39+
new LinkedList<FailureCallback>();
3640

3741
private State state = State.NEW;
3842

@@ -52,7 +56,8 @@ public void addCallback(ListenableFutureCallback<? super T> callback) {
5256
synchronized (mutex) {
5357
switch (state) {
5458
case NEW:
55-
callbacks.add(callback);
59+
successCallbacks.add(callback);
60+
failureCallbacks.add(callback);
5661
break;
5762
case SUCCESS:
5863
callback.onSuccess((T)result);
@@ -64,6 +69,50 @@ public void addCallback(ListenableFutureCallback<? super T> callback) {
6469
}
6570
}
6671

72+
/**
73+
* Adds the given success callback to this registry.
74+
* @param callback the success callback to add
75+
*
76+
* @since 4.1
77+
*/
78+
@SuppressWarnings("unchecked")
79+
public void addSuccessCallback(SuccessCallback<? super T> callback) {
80+
Assert.notNull(callback, "'callback' must not be null");
81+
82+
synchronized (mutex) {
83+
switch (state) {
84+
case NEW:
85+
successCallbacks.add(callback);
86+
break;
87+
case SUCCESS:
88+
callback.onSuccess((T)result);
89+
break;
90+
}
91+
}
92+
}
93+
94+
/**
95+
* Adds the given failure callback to this registry.
96+
* @param callback the failure callback to add
97+
*
98+
* @since 4.1
99+
*/
100+
@SuppressWarnings("unchecked")
101+
public void addFailureCallback(FailureCallback callback) {
102+
Assert.notNull(callback, "'callback' must not be null");
103+
104+
synchronized (mutex) {
105+
switch (state) {
106+
case NEW:
107+
failureCallbacks.add(callback);
108+
break;
109+
case FAILURE:
110+
callback.onFailure((Throwable) result);
111+
break;
112+
}
113+
}
114+
}
115+
67116
/**
68117
* Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all added
69118
* callbacks with the given result
@@ -74,8 +123,8 @@ public void success(T result) {
74123
state = State.SUCCESS;
75124
this.result = result;
76125

77-
while (!callbacks.isEmpty()) {
78-
callbacks.poll().onSuccess(result);
126+
while (!successCallbacks.isEmpty()) {
127+
successCallbacks.poll().onSuccess(result);
79128
}
80129
}
81130
}
@@ -90,8 +139,8 @@ public void failure(Throwable t) {
90139
state = State.FAILURE;
91140
this.result = t;
92141

93-
while (!callbacks.isEmpty()) {
94-
callbacks.poll().onFailure(t);
142+
while (!failureCallbacks.isEmpty()) {
143+
failureCallbacks.poll().onFailure(t);
95144
}
96145
}
97146
}

spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -57,6 +57,12 @@ public void addCallback(ListenableFutureCallback<? super T> callback) {
5757
this.callbacks.addCallback(callback);
5858
}
5959

60+
@Override
61+
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
62+
this.callbacks.addSuccessCallback(successCallback);
63+
this.callbacks.addFailureCallback(failureCallback);
64+
}
65+
6066
@Override
6167
protected final void done() {
6268
Throwable cause;

spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ public void addCallback(ListenableFutureCallback<? super T> callback) {
8484
this.listenableFuture.addCallback(callback);
8585
}
8686

87+
@Override
88+
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
89+
this.listenableFuture.addCallback(successCallback, failureCallback);
90+
}
91+
8792
@Override
8893
public boolean cancel(boolean mayInterruptIfRunning) {
8994
this.settableTask.setCancelled();
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2002-2014 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.util.concurrent;
18+
19+
/**
20+
* Defines the contract for success callbacks that accept the result of a
21+
* {@link ListenableFuture}.
22+
*
23+
* @author Sebastien Deleuze
24+
* @since 4.1
25+
*/
26+
public interface SuccessCallback<T> {
27+
28+
/**
29+
* Called when the {@link ListenableFuture} successfully completes.
30+
* @param result the result
31+
*/
32+
void onSuccess(T result);
33+
34+
}

spring-core/src/test/java/org/springframework/util/concurrent/ListenableFutureTaskTests.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,8 +24,13 @@
2424
import static org.junit.Assert.fail;
2525
import org.junit.Test;
2626

27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.verifyZeroInteractions;
30+
2731
/**
2832
* @author Arjen Poutsma
33+
* @author Sebastien Deleuze
2934
*/
3035
public class ListenableFutureTaskTests {
3136

@@ -77,6 +82,33 @@ public void onFailure(Throwable t) {
7782
task.run();
7883
}
7984

85+
@Test
86+
public void successWithLambdas() throws ExecutionException, InterruptedException {
87+
final String s = "Hello World";
88+
Callable<String> callable = () -> s;
89+
SuccessCallback<String> successCallback = mock(SuccessCallback.class);
90+
FailureCallback failureCallback = mock(FailureCallback.class);
91+
ListenableFutureTask<String> task = new ListenableFutureTask<>(callable);
92+
task.addCallback(successCallback, failureCallback);
93+
task.run();
94+
verify(successCallback).onSuccess(s);
95+
verifyZeroInteractions(failureCallback);
96+
}
8097

98+
@Test
99+
public void failureWithLambdas() throws ExecutionException, InterruptedException {
100+
final String s = "Hello World";
101+
IOException ex = new IOException(s);
102+
Callable<String> callable = () -> {
103+
throw ex;
104+
};
105+
SuccessCallback<String> successCallback = mock(SuccessCallback.class);
106+
FailureCallback failureCallback = mock(FailureCallback.class);
107+
ListenableFutureTask<String> task = new ListenableFutureTask<>(callable);
108+
task.addCallback(successCallback, failureCallback);
109+
task.run();
110+
verify(failureCallback).onFailure(ex);
111+
verifyZeroInteractions(successCallback);
112+
}
81113

82114
}

0 commit comments

Comments
 (0)