Skip to content

Commit 3c1adf7

Browse files
committed
ThreadPoolTaskExecutor/Scheduler cancels remaining Futures on shutdown
Issue: SPR-16607
1 parent 9d63f80 commit 3c1adf7

File tree

8 files changed

+478
-96
lines changed

8 files changed

+478
-96
lines changed

spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.ExecutorService;
2020
import java.util.concurrent.RejectedExecutionHandler;
21+
import java.util.concurrent.RunnableFuture;
2122
import java.util.concurrent.ThreadFactory;
2223
import java.util.concurrent.ThreadPoolExecutor;
2324
import java.util.concurrent.TimeUnit;
@@ -209,12 +210,28 @@ public void shutdown() {
209210
this.executor.shutdown();
210211
}
211212
else {
212-
this.executor.shutdownNow();
213+
for (Runnable remainingTask : this.executor.shutdownNow()) {
214+
cancelRemainingTask(remainingTask);
215+
}
213216
}
214217
awaitTerminationIfNecessary(this.executor);
215218
}
216219
}
217220

221+
/**
222+
* Cancel the given remaining task which never commended execution,
223+
* as returned from {@link ExecutorService#shutdownNow()}.
224+
* @param task the task to cancel (potentially a {@link RunnableFuture})
225+
* @since 5.0.5
226+
* @see #shutdown()
227+
* @see RunnableFuture#cancel(boolean)
228+
*/
229+
protected void cancelRemainingTask(Runnable task) {
230+
if (task instanceof RunnableFuture) {
231+
((RunnableFuture<?>) task).cancel(true);
232+
}
233+
}
234+
218235
/**
219236
* Wait for the executor to terminate, according to the value of the
220237
* {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property.

spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.scheduling.concurrent;
1818

19+
import java.util.Map;
1920
import java.util.concurrent.BlockingQueue;
2021
import java.util.concurrent.Callable;
2122
import java.util.concurrent.Executor;
@@ -35,6 +36,7 @@
3536
import org.springframework.lang.Nullable;
3637
import org.springframework.scheduling.SchedulingTaskExecutor;
3738
import org.springframework.util.Assert;
39+
import org.springframework.util.ConcurrentReferenceHashMap;
3840
import org.springframework.util.concurrent.ListenableFuture;
3941
import org.springframework.util.concurrent.ListenableFutureTask;
4042

@@ -90,6 +92,10 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
9092
@Nullable
9193
private ThreadPoolExecutor threadPoolExecutor;
9294

95+
// Runnable decorator to user-level FutureTask, if different
96+
private final Map<Runnable, Object> decoratedTaskMap =
97+
new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
98+
9399

94100
/**
95101
* Set the ThreadPoolExecutor's core pool size.
@@ -217,7 +223,11 @@ protected ExecutorService initializeExecutor(
217223
queue, threadFactory, rejectedExecutionHandler) {
218224
@Override
219225
public void execute(Runnable command) {
220-
super.execute(taskDecorator.decorate(command));
226+
Runnable decorated = taskDecorator.decorate(command);
227+
if (decorated != command) {
228+
decoratedTaskMap.put(decorated, command);
229+
}
230+
super.execute(decorated);
221231
}
222232
};
223233
}
@@ -353,6 +363,16 @@ public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
353363
}
354364
}
355365

366+
@Override
367+
protected void cancelRemainingTask(Runnable task) {
368+
super.cancelRemainingTask(task);
369+
// Cancel associated user-level Future handle as well
370+
Object original = this.decoratedTaskMap.get(task);
371+
if (original instanceof Future) {
372+
((Future<?>) original).cancel(true);
373+
}
374+
}
375+
356376
/**
357377
* This task executor prefers short-lived work units.
358378
*/

spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java

+30-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.scheduling.concurrent;
1818

1919
import java.util.Date;
20+
import java.util.Map;
2021
import java.util.concurrent.Callable;
2122
import java.util.concurrent.Executor;
2223
import java.util.concurrent.ExecutorService;
@@ -37,6 +38,7 @@
3738
import org.springframework.scheduling.Trigger;
3839
import org.springframework.scheduling.support.TaskUtils;
3940
import org.springframework.util.Assert;
41+
import org.springframework.util.ConcurrentReferenceHashMap;
4042
import org.springframework.util.ErrorHandler;
4143
import org.springframework.util.concurrent.ListenableFuture;
4244
import org.springframework.util.concurrent.ListenableFutureTask;
@@ -67,6 +69,10 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
6769
@Nullable
6870
private ScheduledExecutorService scheduledExecutor;
6971

72+
// Underlying ScheduledFutureTask to user-level ListenableFuture handle, if any
73+
private final Map<Object, ListenableFuture<?>> listenableFutureMap =
74+
new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
75+
7076

7177
/**
7278
* Set the ScheduledExecutorService's pool size.
@@ -253,9 +259,9 @@ public <T> Future<T> submit(Callable<T> task) {
253259
public ListenableFuture<?> submitListenable(Runnable task) {
254260
ExecutorService executor = getScheduledExecutor();
255261
try {
256-
ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
257-
executor.execute(errorHandlingTask(future, false));
258-
return future;
262+
ListenableFutureTask<Object> listenableFuture = new ListenableFutureTask<>(task, null);
263+
executeAndTrack(executor, listenableFuture);
264+
return listenableFuture;
259265
}
260266
catch (RejectedExecutionException ex) {
261267
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
@@ -266,15 +272,32 @@ public ListenableFuture<?> submitListenable(Runnable task) {
266272
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
267273
ExecutorService executor = getScheduledExecutor();
268274
try {
269-
ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
270-
executor.execute(errorHandlingTask(future, false));
271-
return future;
275+
ListenableFutureTask<T> listenableFuture = new ListenableFutureTask<>(task);
276+
executeAndTrack(executor, listenableFuture);
277+
return listenableFuture;
272278
}
273279
catch (RejectedExecutionException ex) {
274280
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
275281
}
276282
}
277283

284+
private void executeAndTrack(ExecutorService executor, ListenableFutureTask<?> listenableFuture) {
285+
Future<?> scheduledFuture = executor.submit(errorHandlingTask(listenableFuture, false));
286+
this.listenableFutureMap.put(scheduledFuture, listenableFuture);
287+
listenableFuture.addCallback(result -> listenableFutureMap.remove(scheduledFuture),
288+
ex -> listenableFutureMap.remove(scheduledFuture));
289+
}
290+
291+
@Override
292+
protected void cancelRemainingTask(Runnable task) {
293+
super.cancelRemainingTask(task);
294+
// Cancel associated user-level ListenableFuture handle as well
295+
ListenableFuture<?> listenableFuture = this.listenableFutureMap.get(task);
296+
if (listenableFuture != null) {
297+
listenableFuture.cancel(true);
298+
}
299+
}
300+
278301
@Override
279302
public boolean prefersShortLivedTasks() {
280303
return true;

0 commit comments

Comments
 (0)