Skip to content

Commit bef22ec

Browse files
committed
Consistent MonoToListenableFutureAdapter.cancel()
Issue: SPR-17336
1 parent 29ff8a8 commit bef22ec

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed

spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
8989
return false;
9090
}
9191
this.monoProcessor.cancel();
92-
return true;
92+
// isCancelled may still return false, if mono completed before the cancel
93+
return this.monoProcessor.isCancelled();
9394
}
9495

9596
@Override
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.support;
17+
18+
import java.time.Duration;
19+
import java.util.concurrent.Future;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import org.junit.Test;
23+
import reactor.core.publisher.Mono;
24+
25+
import org.springframework.util.concurrent.ListenableFuture;
26+
27+
import static org.junit.Assert.*;
28+
29+
/**
30+
* Unit tests for {@link MonoToListenableFutureAdapter}.
31+
* @author Rossen Stoyanchev
32+
*/
33+
public class MonoToListenableFutureAdapterTests {
34+
35+
@Test
36+
public void success() {
37+
String expected = "one";
38+
AtomicReference<Object> actual = new AtomicReference<>();
39+
ListenableFuture<String> future = new MonoToListenableFutureAdapter<>(Mono.just(expected));
40+
future.addCallback(actual::set, actual::set);
41+
42+
assertEquals(expected, actual.get());
43+
}
44+
45+
@Test
46+
public void failure() {
47+
Throwable expected = new IllegalStateException("oops");
48+
AtomicReference<Object> actual = new AtomicReference<>();
49+
ListenableFuture<String> future = new MonoToListenableFutureAdapter<>(Mono.error(expected));
50+
future.addCallback(actual::set, actual::set);
51+
52+
assertEquals(expected, actual.get());
53+
}
54+
55+
@Test
56+
public void cancellation() {
57+
Mono<Long> mono = Mono.delay(Duration.ofSeconds(60));
58+
Future<Long> future = new MonoToListenableFutureAdapter<>(mono);
59+
60+
assertTrue(future.cancel(true));
61+
assertTrue(future.isCancelled());
62+
}
63+
64+
@Test
65+
public void cancellationAfterTerminated() {
66+
Future<Void> future = new MonoToListenableFutureAdapter<>(Mono.empty());
67+
68+
assertFalse("Should return false if task already completed", future.cancel(true));
69+
assertFalse(future.isCancelled());
70+
}
71+
72+
}

0 commit comments

Comments
 (0)