Skip to content

Make ListenableFuture compliant with Java 8 lambda #571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.util.concurrent.TimeUnit;

import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;

/**
* A pass-through {@code Future} handle that can be used for method signatures
Expand Down Expand Up @@ -74,7 +76,15 @@ public V get(long timeout, TimeUnit unit) {

@Override
public void addCallback(ListenableFutureCallback<? super V> callback) {
callback.onSuccess(this.value);
addCallback(callback, callback);
}

@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
try {
successCallback.onSuccess(this.value);
} catch(Throwable t) {
failureCallback.onFailure(t);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that one of your commits introduced an empty line after the AsyncResult constructor. That's not necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.util.concurrent;

/**
* Defines the contract for failure callbacks that accept the result of a
* {@link ListenableFuture}.
*
* @author Sebastien Deleuze
* @since 4.1
*/
public interface FailureCallback {

/**
* Called when the {@link ListenableFuture} fails to complete.
* @param t the exception that triggered the failure
*/
void onFailure(Throwable t);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
* <p>Inspired by {@code com.google.common.util.concurrent.ListenableFuture}.

* @author Arjen Poutsma
* @author Sebastien Deleuze
* @since 4.0
*/
public interface ListenableFuture<T> extends Future<T> {
Expand All @@ -37,4 +38,15 @@ public interface ListenableFuture<T> extends Future<T> {
*/
void addCallback(ListenableFutureCallback<? super T> callback);

/**
* Registers the given success and failure callbacks to this {@code ListenableFuture}.
* The callback will be triggered when this {@code Future} is complete or, if it is
* already complete immediately. This is a Java 8 lambdas compliant alternative to
* {@link #addCallback(ListenableFutureCallback)}.
* @param successCallback the success callback to register
* @param failureCallback the failure callback to register
* @since 4.1
*/
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,12 +44,18 @@ protected ListenableFutureAdapter(ListenableFuture<S> adaptee) {

@Override
public void addCallback(final ListenableFutureCallback<? super T> callback) {
addCallback(callback, callback);
}

@Override
public void addCallback(final SuccessCallback<? super T> successCallback,
final FailureCallback failureCallback) {
ListenableFuture<S> listenableAdaptee = (ListenableFuture<S>) getAdaptee();
listenableAdaptee.addCallback(new ListenableFutureCallback<S>() {
@Override
public void onSuccess(S result) {
try {
callback.onSuccess(adaptInternal(result));
successCallback.onSuccess(adaptInternal(result));
}
catch (ExecutionException ex) {
Throwable cause = ex.getCause();
Expand All @@ -62,8 +68,9 @@ public void onSuccess(S result) {

@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
failureCallback.onFailure(t);
}
});
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,20 +21,9 @@
* {@link ListenableFuture}.
*
* @author Arjen Poutsma
* @author Sebastien Deleuze
* @since 4.0
*/
public interface ListenableFutureCallback<T> {

/**
* Called when the {@link ListenableFuture} successfully completes.
* @param result the result
*/
void onSuccess(T result);

/**
* Called when the {@link ListenableFuture} fails to complete.
* @param t the exception that triggered the failure
*/
void onFailure(Throwable t);
public interface ListenableFutureCallback<T> extends SuccessCallback<T>, FailureCallback {

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,12 +27,16 @@
* <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}.
*
* @author Arjen Poutsma
* @author Sebastien Deleuze
* @since 4.0
*/
public class ListenableFutureCallbackRegistry<T> {

private final Queue<ListenableFutureCallback<? super T>> callbacks =
new LinkedList<ListenableFutureCallback<? super T>>();
private final Queue<SuccessCallback<? super T>> successCallbacks =
new LinkedList<SuccessCallback<? super T>>();

private final Queue<FailureCallback> failureCallbacks =
new LinkedList<FailureCallback>();

private State state = State.NEW;

Expand All @@ -52,7 +56,8 @@ public void addCallback(ListenableFutureCallback<? super T> callback) {
synchronized (mutex) {
switch (state) {
case NEW:
callbacks.add(callback);
successCallbacks.add(callback);
failureCallbacks.add(callback);
break;
case SUCCESS:
callback.onSuccess((T)result);
Expand All @@ -64,6 +69,50 @@ public void addCallback(ListenableFutureCallback<? super T> callback) {
}
}

/**
* Adds the given success callback to this registry.
* @param callback the success callback to add
*
* @since 4.1
*/
@SuppressWarnings("unchecked")
public void addSuccessCallback(SuccessCallback<? super T> callback) {
Assert.notNull(callback, "'callback' must not be null");

synchronized (mutex) {
switch (state) {
case NEW:
successCallbacks.add(callback);
break;
case SUCCESS:
callback.onSuccess((T)result);
break;
}
}
}

/**
* Adds the given failure callback to this registry.
* @param callback the failure callback to add
*
* @since 4.1
*/
@SuppressWarnings("unchecked")
public void addFailureCallback(FailureCallback callback) {
Assert.notNull(callback, "'callback' must not be null");

synchronized (mutex) {
switch (state) {
case NEW:
failureCallbacks.add(callback);
break;
case FAILURE:
callback.onFailure((Throwable) result);
break;
}
}
}

/**
* Triggers a {@link ListenableFutureCallback#onSuccess(Object)} call on all added
* callbacks with the given result
Expand All @@ -74,8 +123,8 @@ public void success(T result) {
state = State.SUCCESS;
this.result = result;

while (!callbacks.isEmpty()) {
callbacks.poll().onSuccess(result);
while (!successCallbacks.isEmpty()) {
successCallbacks.poll().onSuccess(result);
}
}
}
Expand All @@ -90,8 +139,8 @@ public void failure(Throwable t) {
state = State.FAILURE;
this.result = t;

while (!callbacks.isEmpty()) {
callbacks.poll().onFailure(t);
while (!failureCallbacks.isEmpty()) {
failureCallbacks.poll().onFailure(t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,6 +57,12 @@ public void addCallback(ListenableFutureCallback<? super T> callback) {
this.callbacks.addCallback(callback);
}

@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.callbacks.addSuccessCallback(successCallback);
this.callbacks.addFailureCallback(failureCallback);
}

@Override
protected final void done() {
Throwable cause;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public void addCallback(ListenableFutureCallback<? super T> callback) {
this.listenableFuture.addCallback(callback);
}

@Override
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
this.listenableFuture.addCallback(successCallback, failureCallback);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
this.settableTask.setCancelled();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.util.concurrent;

/**
* Defines the contract for success callbacks that accept the result of a
* {@link ListenableFuture}.
*
* @author Sebastien Deleuze
* @since 4.1
*/
public interface SuccessCallback<T> {

/**
* Called when the {@link ListenableFuture} successfully completes.
* @param result the result
*/
void onSuccess(T result);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,8 +24,13 @@
import static org.junit.Assert.fail;
import org.junit.Test;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

/**
* @author Arjen Poutsma
* @author Sebastien Deleuze
*/
public class ListenableFutureTaskTests {

Expand Down Expand Up @@ -77,6 +82,33 @@ public void onFailure(Throwable t) {
task.run();
}

@Test
public void successWithLambdas() throws ExecutionException, InterruptedException {
final String s = "Hello World";
Callable<String> callable = () -> s;
SuccessCallback<String> successCallback = mock(SuccessCallback.class);
FailureCallback failureCallback = mock(FailureCallback.class);
ListenableFutureTask<String> task = new ListenableFutureTask<>(callable);
task.addCallback(successCallback, failureCallback);
task.run();
verify(successCallback).onSuccess(s);
verifyZeroInteractions(failureCallback);
}

@Test
public void failureWithLambdas() throws ExecutionException, InterruptedException {
final String s = "Hello World";
IOException ex = new IOException(s);
Callable<String> callable = () -> {
throw ex;
};
SuccessCallback<String> successCallback = mock(SuccessCallback.class);
FailureCallback failureCallback = mock(FailureCallback.class);
ListenableFutureTask<String> task = new ListenableFutureTask<>(callable);
task.addCallback(successCallback, failureCallback);
task.run();
verify(failureCallback).onFailure(ex);
verifyZeroInteractions(successCallback);
}

}
Loading