From 6f1483221540ddc26066837621cf81aaac701f55 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Sat, 24 Dec 2022 06:43:50 -0500
Subject: [PATCH 01/61] Shutdown changes to address JDK-8288899 and JDK-8286352
---
.../java/util/concurrent/ForkJoinPool.java | 160 +++++++++++-------
.../java/util/concurrent/ForkJoinTask.java | 9 +-
.../concurrent/forkjoin/AsyncShutdownNow.java | 2 +-
3 files changed, 101 insertions(+), 70 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index dd4950269e62d..35df71e001a59 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1651,8 +1651,12 @@ final void registerWorker(WorkQueue w) {
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
+ int cfg = 0;
WorkQueue w = (wt == null) ? null : wt.workQueue;
- int cfg = (w == null) ? 0 : w.config;
+ if (w != null) {
+ cfg = w.config;
+ w.access = STOP; // may be redundant
+ }
long c = ctl;
if ((cfg & TRIMMED) == 0) // decrement counts
do {} while (c != (c = compareAndExchangeCtl(
@@ -1676,8 +1680,7 @@ else if ((int)c == 0) // was dropped on timeout
signalWork(); // possibly replace worker
}
if (ex != null) {
- if (w != null) {
- w.access = STOP; // cancel tasks
+ if (w != null) { // cancel tasks
for (ForkJoinTask> t; (t = w.nextLocalTask(0)) != null; )
ForkJoinTask.cancelIgnoringExceptions(t);
}
@@ -1807,7 +1810,6 @@ final void runWorker(WorkQueue w) {
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
} while ((src = scan(w, src, r)) >= 0 ||
(src = awaitWork(w)) == 0);
- w.access = STOP; // record normal termination
}
}
@@ -1869,10 +1871,10 @@ private int awaitWork(WorkQueue w) {
} while (pc != (pc = compareAndExchangeCtl(
pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
if ((qc & RC_MASK) <= 0L) {
- if (hasTasks(true) && (w.phase >= 0 || reactivate() == w))
- return 0; // check for stragglers
if (runState != 0 && tryTerminate(false, false))
return -1; // quiescent termination
+ if (hasTasks(true) && (w.phase >= 0 || reactivate() == w))
+ return 0; // check for stragglers
idle = true;
}
WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
@@ -2500,23 +2502,26 @@ private boolean tryTerminate(boolean now, boolean enable) {
}
getAndBitwiseOrRunState(SHUTDOWN | STOP);
}
- WorkQueue released = reactivate(); // try signalling waiter
- int tc = (short)(ctl >>> TC_SHIFT);
- if (released == null && tc > 0) { // help unblock and cancel
- Thread current = Thread.currentThread();
- WorkQueue w = ((current instanceof ForkJoinWorkerThread) ?
- ((ForkJoinWorkerThread)current).workQueue : null);
- int r = (w == null) ? 0 : w.config + 1; // stagger traversals
- WorkQueue[] qs = queues;
+ int r = 0; // for queue traversal
+ Thread current = Thread.currentThread();
+ if (current instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)current;
+ WorkQueue w = wt.workQueue;
+ if (wt.pool == this && w != null) {
+ r = w.config; // stagger traversals
+ w.access = STOP; // may be redundant
+ }
+ }
+ if (reactivate() == null) { // try signalling waiter
+ WorkQueue[] qs = queues; // or help unblock and cancel
int n = (qs == null) ? 0 : qs.length;
for (int i = 0; i < n; ++i) {
WorkQueue q; Thread thread;
- if ((q = qs[(r + i) & (n - 1)]) != null &&
- (thread = q.owner) != current && q.access != STOP) {
+ if ((q = qs[(r + i) & (n - 1)]) != null) {
for (ForkJoinTask> t; (t = q.poll(null)) != null; )
ForkJoinTask.cancelIgnoringExceptions(t);
- if (thread != null && !thread.isInterrupted()) {
- q.forcePhaseActive(); // for awaitWork
+ if (q.access != STOP && (thread = q.owner) != null) {
+ q.forcePhaseActive(); // for awaitWork
try {
thread.interrupt();
} catch (Throwable ignore) {
@@ -2525,7 +2530,7 @@ private boolean tryTerminate(boolean now, boolean enable) {
}
}
}
- if ((tc <= 0 || (short)(ctl >>> TC_SHIFT) <= 0) &&
+ if ((short)(ctl >>> TC_SHIFT) <= 0 &&
(getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
(lock = registrationLock) != null) {
lock.lock(); // signal when no workers
@@ -3037,45 +3042,52 @@ public List> invokeAll(Collection extends Callable> tasks,
}
}
- // Task to hold results from InvokeAnyTasks
+ /**
+ * Task to hold results for invokeAny, or to report exception if
+ * all subtasks fail or are cancelled or the pool is terminating.
+ */
static final class InvokeAnyRoot extends ForkJoinTask {
private static final long serialVersionUID = 2838392045355241008L;
@SuppressWarnings("serial") // Conditionally serializable
volatile E result;
- final AtomicInteger count; // in case all throw
+ final AtomicInteger count; // in case all fail
@SuppressWarnings("serial")
final ForkJoinPool pool; // to check shutdown while collecting
InvokeAnyRoot(int n, ForkJoinPool p) {
pool = p;
count = new AtomicInteger(n);
}
- final void tryComplete(Callable c) { // called by InvokeAnyTasks
- Throwable ex = null;
- boolean failed;
- if (c == null || Thread.interrupted() ||
- (pool != null && pool.runState < 0))
- failed = true;
- else if (isDone())
- failed = false;
- else {
- try {
- complete(c.call());
- failed = false;
- } catch (Throwable tx) {
- ex = tx;
- failed = true;
+ final void tryComplete(E v, Throwable ex, boolean fail) {
+ if (!isDone()) {
+ if (!fail) {
+ result = v;
+ quietlyComplete();
}
+ else if (pool.runState < 0 || count.getAndDecrement() <= 1)
+ trySetThrown(ex != null? ex : new CancellationException());
+ }
+ }
+ final boolean checkDone() {
+ if (isDone())
+ return true;
+ else if (pool.runState >= 0)
+ return false;
+ else {
+ tryComplete(null, null, true);
+ return true;
}
- if ((pool != null && pool.runState < 0) ||
- (failed && count.getAndDecrement() <= 1))
- trySetThrown(ex != null ? ex : new CancellationException());
}
- public final boolean exec() { return false; } // never forked
+ public final boolean exec() { return false; }
public final E getRawResult() { return result; }
public final void setRawResult(E v) { result = v; }
}
- // Variant of AdaptedInterruptibleCallable with results in InvokeAnyRoot
+ /**
+ * Variant of AdaptedInterruptibleCallable with results in
+ * InvokeAnyRoot (and never independently joined). Task
+ * cancellation status is used to avoid multiple calls to
+ * root.tryComplete by the same task under async cancellation.
+ */
static final class InvokeAnyTask extends ForkJoinTask {
private static final long serialVersionUID = 2838392045355241008L;
final InvokeAnyRoot root;
@@ -3087,23 +3099,36 @@ static final class InvokeAnyTask extends ForkJoinTask {
this.callable = callable;
}
public final boolean exec() {
+ InvokeAnyRoot r; Callable c;
Thread.interrupted();
- runner = Thread.currentThread();
- root.tryComplete(callable);
- runner = null;
- Thread.interrupted();
+ if ((c = callable) != null && (r = root) != null && !r.checkDone()) {
+ runner = Thread.currentThread();
+ E v = null;
+ Throwable ex = null;
+ boolean fail = false;
+ try {
+ v = c.call();
+ } catch (Throwable rex) {
+ ex = rex;
+ fail = true;
+ }
+ runner = null;
+ if (trySetCancelled() >= 0) // else lost to async cancel
+ r.tryComplete(v, ex, fail);
+ }
return true;
}
public final boolean cancel(boolean mayInterruptIfRunning) {
- Thread t;
- boolean stat = super.cancel(false);
+ Thread t; InvokeAnyRoot r;
+ if (trySetCancelled() >= 0 && (r = root) != null)
+ r.tryComplete(null, null, true); // else lost race to cancel
if (mayInterruptIfRunning && (t = runner) != null) {
try {
t.interrupt();
} catch (Throwable ignore) {
}
}
- return stat;
+ return isCancelled();
}
public final void setRawResult(E v) {} // unused
public final E getRawResult() { return null; }
@@ -3591,25 +3616,30 @@ public boolean awaitQuiescence(long timeout, TimeUnit unit) {
*/
@Override
public void close() {
- if ((config & ISCOMMON) == 0) {
- boolean terminated = tryTerminate(false, false);
- if (!terminated) {
- shutdown();
- boolean interrupted = false;
- while (!terminated) {
- try {
- terminated = awaitTermination(1L, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- if (!interrupted) {
- shutdownNow();
- interrupted = true;
- }
- }
- }
- if (interrupted) {
- Thread.currentThread().interrupt();
+ ReentrantLock lock = registrationLock;
+ Condition cond = null;
+ boolean interrupted = false;
+ if (lock != null && (config & ISCOMMON) == 0 &&
+ (runState & TERMINATED) == 0) {
+ checkPermission();
+ for (;;) {
+ tryTerminate(interrupted, true); // call outside of lock
+ if ((runState & TERMINATED) != 0)
+ break;
+ lock.lock();
+ try {
+ if (cond == null && (cond = termination) == null)
+ termination = cond = lock.newCondition();
+ if ((runState & TERMINATED) == 0)
+ cond.await();
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ } finally {
+ lock.unlock();
}
}
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
}
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index e0737cde89d19..e238850359bbc 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -322,11 +322,11 @@ private int setDone() {
/**
* Sets ABNORMAL DONE status unless already done, and wakes up threads
* waiting to join this task.
- * @return status on exit
+ * @return previous status
*/
- private int trySetCancelled() {
+ final int trySetCancelled() {
int s;
- do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
+ do {} while ((s = status) >= 0 && !casStatus(s, s | (DONE | ABNORMAL)));
signalWaiters();
return s;
}
@@ -852,7 +852,8 @@ public static > Collection invokeAll(Collection
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
- return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL;
+ trySetCancelled();
+ return isCancelled();
}
public final boolean isDone() {
diff --git a/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java b/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java
index 7f7618a2189c9..d742bc0955fd3 100644
--- a/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java
+++ b/test/jdk/java/util/concurrent/forkjoin/AsyncShutdownNow.java
@@ -128,7 +128,7 @@ public void testInvokeAll(ExecutorService executor) throws Exception {
/**
* Test shutdownNow with thread blocked in invokeAny.
*/
- @Test(dataProvider = "executors", enabled = false)
+ @Test(dataProvider = "executors")
public void testInvokeAny(ExecutorService executor) throws Exception {
System.out.format("testInvokeAny: %s%n", executor);
try (executor) {
From 8a6ff99537b0363044a279eac588cff053194d90 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Wed, 28 Dec 2022 15:45:21 -0500
Subject: [PATCH 02/61] Refactor and regularize use of tryTerminate
---
.../java/util/concurrent/ForkJoinPool.java | 102 +++++++++---------
1 file changed, 54 insertions(+), 48 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 35df71e001a59..bd101fe5a8315 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1665,7 +1665,7 @@ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
(SP_MASK & c)))));
else if ((int)c == 0) // was dropped on timeout
cfg &= ~SRC; // suppress signal if last
- if (!tryTerminate(false, false) && w != null) {
+ if (tryTerminate(false, false) >= 0 && w != null) {
ReentrantLock lock; WorkQueue[] qs; int n, i;
long ns = w.nsteals & 0xffffffffL;
if ((lock = registrationLock) != null) {
@@ -1871,7 +1871,7 @@ private int awaitWork(WorkQueue w) {
} while (pc != (pc = compareAndExchangeCtl(
pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
if ((qc & RC_MASK) <= 0L) {
- if (runState != 0 && tryTerminate(false, false))
+ if (runState != 0 && tryTerminate(false, false) < 0)
return -1; // quiescent termination
if (hasTasks(true) && (w.phase >= 0 || reactivate() == w))
return 0; // check for stragglers
@@ -2484,25 +2484,31 @@ static int getSurplusQueuedTaskCount() {
* @param now if true, unconditionally terminate, else only
* if no work and no active workers
* @param enable if true, terminate when next possible
- * @return true if terminating or terminated
- */
- private boolean tryTerminate(boolean now, boolean enable) {
- int rs; ReentrantLock lock; Condition cond;
- if ((rs = runState) >= 0) { // set SHUTDOWN and/or STOP
- if ((config & ISCOMMON) != 0)
- return false; // cannot shutdown
- if (!now) {
- if ((rs & SHUTDOWN) == 0) {
- if (!enable)
- return false;
- getAndBitwiseOrRunState(SHUTDOWN);
- }
- if (!canStop())
- return false;
- }
- getAndBitwiseOrRunState(SHUTDOWN | STOP);
+ * @return runState on exit
+ */
+ private int tryTerminate(boolean now, boolean enable) {
+ int rs;
+ if ((rs = runState) >= 0 && (config & ISCOMMON) == 0) {
+ if (!now && enable && (rs & SHUTDOWN) == 0)
+ rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
+ if (now || ((rs & SHUTDOWN) != 0 && canStop()))
+ rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
+ else
+ rs = runState;
+ }
+ if (rs < 0 && (rs & TERMINATED) == 0) {
+ helpTerminate();
+ rs = runState;
}
- int r = 0; // for queue traversal
+ return rs;
+ }
+
+ /**
+ * Helps complete termination by cancelling tasks, unblocking
+ * workers and possibly triggering TERMINATED state.
+ */
+ private void helpTerminate() {
+ int r = 1; // for queue traversal
Thread current = Thread.currentThread();
if (current instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)current;
@@ -2512,16 +2518,20 @@ private boolean tryTerminate(boolean now, boolean enable) {
w.access = STOP; // may be redundant
}
}
- if (reactivate() == null) { // try signalling waiter
- WorkQueue[] qs = queues; // or help unblock and cancel
- int n = (qs == null) ? 0 : qs.length;
- for (int i = 0; i < n; ++i) {
- WorkQueue q; Thread thread;
- if ((q = qs[(r + i) & (n - 1)]) != null) {
- for (ForkJoinTask> t; (t = q.poll(null)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
- if (q.access != STOP && (thread = q.owner) != null) {
- q.forcePhaseActive(); // for awaitWork
+ WorkQueue[] qs; WorkQueue q; Thread thread;
+ int n = ((qs = queues) == null) ? 0 : qs.length;
+ for (int i = 0; i < n; ++i) { // help cancel tasks
+ if ((q = qs[(r + i) & (n - 1)]) != null) {
+ for (ForkJoinTask> t; (t = q.poll(null)) != null; )
+ ForkJoinTask.cancelIgnoringExceptions(t);
+ }
+ }
+ if (reactivate() == null) { // activate or help unblock
+ n = ((qs = queues) == null) ? 0 : qs.length;
+ for (int i = 0; i < n; i += 2) {
+ if ((q = qs[(r + i) & (n - 1)]) != null && q.access != STOP) {
+ q.forcePhaseActive(); // for awaitWork
+ if ((thread = q.owner) != null && !thread.isInterrupted()) {
try {
thread.interrupt();
} catch (Throwable ignore) {
@@ -2530,16 +2540,16 @@ private boolean tryTerminate(boolean now, boolean enable) {
}
}
}
- if ((short)(ctl >>> TC_SHIFT) <= 0 &&
+ ReentrantLock lock; Condition cond; // signal when no workers
+ if ((short)(ctl >>> TC_SHIFT) <= 0 && (runState & TERMINATED) == 0 &&
(getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
(lock = registrationLock) != null) {
- lock.lock(); // signal when no workers
+ lock.lock();
if ((cond = termination) != null)
cond.signalAll();
lock.unlock();
container.close();
}
- return true;
}
// Exported methods
@@ -3504,7 +3514,7 @@ public List shutdownNow() {
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
- return (runState & TERMINATED) != 0;
+ return (tryTerminate(false, false) & TERMINATED) != 0;
}
/**
@@ -3549,30 +3559,29 @@ public boolean isShutdown() {
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
- ReentrantLock lock; Condition cond; boolean terminated;
+ ReentrantLock lock; Condition cond = null;
long nanos = unit.toNanos(timeout);
if ((config & ISCOMMON) != 0) {
if (helpQuiescePool(this, nanos, true) < 0)
throw new InterruptedException();
- terminated = false;
+ return false;
}
- else if (!(terminated = ((runState & TERMINATED) != 0))) {
- tryTerminate(false, false); // reduce transient blocking
- if ((lock = registrationLock) != null &&
- !(terminated = (((runState & TERMINATED) != 0)))) {
+ else if ((lock = registrationLock) != null) {
+ while ((tryTerminate(false, false) & TERMINATED) == 0) {
+ if (nanos <= 0L)
+ return false;
lock.lock();
try {
- if ((cond = termination) == null)
+ if (cond == null && (cond = termination) == null)
termination = cond = lock.newCondition();
- while (!(terminated = ((runState & TERMINATED) != 0)) &&
- nanos > 0L)
+ if ((runState & TERMINATED) == 0)
nanos = cond.awaitNanos(nanos);
} finally {
lock.unlock();
}
}
}
- return terminated;
+ return true;
}
/**
@@ -3622,10 +3631,7 @@ public void close() {
if (lock != null && (config & ISCOMMON) == 0 &&
(runState & TERMINATED) == 0) {
checkPermission();
- for (;;) {
- tryTerminate(interrupted, true); // call outside of lock
- if ((runState & TERMINATED) != 0)
- break;
+ while ((tryTerminate(interrupted, true) & TERMINATED) == 0) {
lock.lock();
try {
if (cond == null && (cond = termination) == null)
From 4bfaac195c68d4293976f408da4d0a0fc5f5e665 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Sun, 1 Jan 2023 15:25:43 -0500
Subject: [PATCH 03/61] More uniform termnination for interruptibles
---
.../java/util/concurrent/ForkJoinPool.java | 302 ++++++++++--------
.../java/util/concurrent/ForkJoinTask.java | 36 ++-
2 files changed, 187 insertions(+), 151 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index bd101fe5a8315..8645631a552d4 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -582,25 +582,18 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* Shutdown and Termination. A call to shutdownNow invokes
* tryTerminate to atomically set a mode bit. The calling thread,
- * as well as every other worker thereafter terminating, helps
- * terminate others by cancelling their unprocessed tasks, and
- * interrupting other workers. Calls to non-abrupt shutdown()
- * preface this by checking isQuiescent before triggering the
- * "STOP" phase of termination. During termination, workers are
- * stopped using all three of (often in parallel): releasing via
- * ctl (method reactivate), interrupts, and cancelling tasks that
- * will cause workers to not find work and exit. To support this,
- * worker references not removed from the queues array during
- * termination. It is possible for late thread creations to still
- * be in progress after a quiescent termination reports terminated
- * status, but they will also immediately terminate. To conform to
- * ExecutorService invoke, invokeAll, and invokeAny specs, we must
- * track pool status while waiting in ForkJoinTask.awaitDone, and
- * interrupt interruptible callers on termination, while also
- * avoiding cancelling other tasks that are normally completing
- * during quiescent termination. This is tracked by recording
- * ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag
- * argument to joining methods.
+ * as well as every other worker thereafter terminating invokes
+ * helpTerminate, which cancels queued tasks, reactivates idle
+ * workers, and interrupts others (in addtion to deregistering
+ * itself). Calls to non-abrupt shutdown() preface this by
+ * checking canStop before triggering the "STOP" phase of
+ * termination. Like most eveything else, we try to speed up
+ * termination by balancing parallelism and contention (which may
+ * occur while trying to cancel tasks or unblock workers), by
+ * starting task scans at different indices, and by fanning out
+ * reactivation of idle workers. The main remaining cost is the
+ * potential need to re-interrupt active workers with tasks that
+ * swallow them.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
@@ -727,20 +720,40 @@ public class ForkJoinPool extends AbstractExecutorService {
* may be JVM-dependent and must access particular Thread class
* fields to achieve this effect.
*
- * Interrupt handling
- * ==================
+ * Interrupt handling and Cancellation
+ * ===================================
*
- * The framework is designed to manage task cancellation
+ * The framework is primarily designed to manage task cancellation
* (ForkJoinTask.cancel) independently from the interrupt status
* of threads running tasks. (See the public ForkJoinTask
- * documentation for rationale.) Interrupts are issued only in
- * tryTerminate, when workers should be terminating and tasks
- * should be cancelled anyway. Interrupts are cleared only when
- * necessary to ensure that calls to LockSupport.park do not loop
- * indefinitely (park returns immediately if the current thread is
- * interrupted). For cases in which task bodies are specified or
- * desired to interrupt upon cancellation, ForkJoinTask.cancel can
- * be overridden to do so (as is done for invoke{Any,All}).
+ * documentation for rationale.) Interrupts are issued internally
+ * only in helpTerminate, when workers should be terminating and
+ * tasks should be cancelled anyway. By default, interrupts are
+ * cleared only when necessary to ensure that calls to
+ * LockSupport.park do not loop indefinitely (park returns
+ * immediately if the current thread is interrupted).
+ *
+ * To conform to specs and expectations surrounding execution,
+ * cancellation and termination of ExecutorService invoke, submit,
+ * invokeAll, and invokeAny methods (without penalizing others) we
+ * use AdaptedInterruptibleCallables in ExecutorService methods
+ * accepting Callables (actually, for InvokeAny, a variant that
+ * gathers a single result). We also ensure that external
+ * submitters do not help run such tasks by recording
+ * ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag
+ * argument to joining methods. External callers of task.get etc
+ * are not directly interrupted on shutdown, but must be woken due
+ * to the task itself being cancelled during termination.
+ * Interruptible tasks always clear interrupts and check for
+ * termination and cancellation before invoking user-supplied
+ * Callables These rechecks are needed because the cleared
+ * interrupt could be due to cancellation, termination, or any
+ * other reason. These tasks also record their runner thread so
+ * that cancel(true) can interrupt them later. As is the case with
+ * any kind of pool, it is possible that an interrupt designed to
+ * cancel one task occurs both late and unnecessarily, so instead
+ * "spuriously" interrupts the thread while performing a later
+ * task.
*
* Memory placement
* ================
@@ -1440,6 +1453,11 @@ else if (a[nk] == null)
// misc
+ final void cancelAllTasks() {
+ for (ForkJoinTask> t; (t = poll(null)) != null; )
+ ForkJoinTask.cancelIgnoringExceptions(t);
+ }
+
/**
* Returns true if owned by a worker thread and not known to be blocked.
*/
@@ -1676,16 +1694,12 @@ else if ((int)c == 0) // was dropped on timeout
stealCount += ns; // accumulate steals
lock.unlock();
}
+ w.cancelAllTasks();
if ((cfg & SRC) != 0)
signalWork(); // possibly replace worker
}
- if (ex != null) {
- if (w != null) { // cancel tasks
- for (ForkJoinTask> t; (t = w.nextLocalTask(0)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
- }
+ if (ex != null)
ForkJoinTask.rethrow(ex);
- }
}
/*
@@ -1819,13 +1833,15 @@ final void runWorker(WorkQueue w) {
* returning source id or retry indicator.
*
* @param w caller's WorkQueue
- * @param prevSrc the two previous queues (if nonzero) stolen from in current phase, packed as int
+ * @param prevSrc the two previous queues (if nonzero) stolen from
+ * in current phase, packed as int
* @param r random seed
* @return the next prevSrc value to use, or negative if none found
*/
private int scan(WorkQueue w, int prevSrc, int r) {
+ int rs = runState;
WorkQueue[] qs = queues;
- int n = (w == null || qs == null) ? 0 : qs.length;
+ int n = (rs < 0 || qs == null || w == null) ? 0 : qs.length;
for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) {
int j, cap; WorkQueue q; ForkJoinTask>[] a;
if ((q = qs[j = r & (n - 1)]) != null &&
@@ -1871,22 +1887,20 @@ private int awaitWork(WorkQueue w) {
} while (pc != (pc = compareAndExchangeCtl(
pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
if ((qc & RC_MASK) <= 0L) {
- if (runState != 0 && tryTerminate(false, false) < 0)
- return -1; // quiescent termination
if (hasTasks(true) && (w.phase >= 0 || reactivate() == w))
return 0; // check for stragglers
+ if (runState != 0 && tryTerminate(false, false) < 0)
+ return -1; // quiescent termination
idle = true;
}
WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) | 0xf;
while ((p = w.phase) < 0 && --spins > 0)
Thread.onSpinWait();
- if (p < 0) {
+ if (p < 0) { // await signal
long deadline = idle ? keepAlive + System.currentTimeMillis() : 0L;
LockSupport.setCurrentBlocker(this);
- for (;;) { // await signal or termination
- if (runState < 0)
- return -1;
+ for (;;) {
w.access = PARKED; // enable unpark
if (w.phase < 0) {
if (idle)
@@ -2487,18 +2501,18 @@ static int getSurplusQueuedTaskCount() {
* @return runState on exit
*/
private int tryTerminate(boolean now, boolean enable) {
- int rs;
- if ((rs = runState) >= 0 && (config & ISCOMMON) == 0) {
- if (!now && enable && (rs & SHUTDOWN) == 0)
- rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
- if (now || ((rs & SHUTDOWN) != 0 && canStop()))
- rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
- else
+ int rs = runState;
+ if ((config & ISCOMMON) == 0) {
+ if (rs >= 0) {
+ if (!now && enable && (rs & SHUTDOWN) == 0)
+ rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
+ if (now || ((rs & SHUTDOWN) != 0 && canStop()))
+ rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
+ }
+ if ((rs < 0 || (rs = runState) < 0) && (rs & TERMINATED) == 0) {
+ helpTerminate();
rs = runState;
- }
- if (rs < 0 && (rs & TERMINATED) == 0) {
- helpTerminate();
- rs = runState;
+ }
}
return rs;
}
@@ -2520,18 +2534,21 @@ private void helpTerminate() {
}
WorkQueue[] qs; WorkQueue q; Thread thread;
int n = ((qs = queues) == null) ? 0 : qs.length;
- for (int i = 0; i < n; ++i) { // help cancel tasks
- if ((q = qs[(r + i) & (n - 1)]) != null) {
- for (ForkJoinTask> t; (t = q.poll(null)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
- }
- }
- if (reactivate() == null) { // activate or help unblock
+ for (int i = 0; i < n; ++i) { // help cancel tasks
+ if ((q = qs[(r + i) & (n - 1)]) != null)
+ q.cancelAllTasks();
+ }
+ if (reactivate() != null) // activate <= 2 idle workers
+ reactivate();
+ else if ((runState & TERMINATED) == 0) { // or unblock active workers
+ boolean canTerminate = true;
n = ((qs = queues) == null) ? 0 : qs.length;
for (int i = 0; i < n; i += 2) {
- if ((q = qs[(r + i) & (n - 1)]) != null && q.access != STOP) {
+ if ((q = qs[(r + i) & (n - 1)]) != null &&
+ (thread = q.owner) != null && q.access != STOP) {
+ canTerminate = false;
q.forcePhaseActive(); // for awaitWork
- if ((thread = q.owner) != null && !thread.isInterrupted()) {
+ if (!thread.isInterrupted()) {
try {
thread.interrupt();
} catch (Throwable ignore) {
@@ -2539,16 +2556,16 @@ private void helpTerminate() {
}
}
}
- }
- ReentrantLock lock; Condition cond; // signal when no workers
- if ((short)(ctl >>> TC_SHIFT) <= 0 && (runState & TERMINATED) == 0 &&
- (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
- (lock = registrationLock) != null) {
- lock.lock();
- if ((cond = termination) != null)
- cond.signalAll();
- lock.unlock();
- container.close();
+ ReentrantLock lock; Condition cond; // signal when no workers
+ if (canTerminate && (short)(ctl >>> TC_SHIFT) <= 0 &&
+ (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
+ (lock = registrationLock) != null) {
+ lock.lock();
+ if ((cond = termination) != null)
+ cond.signalAll();
+ lock.unlock();
+ container.close();
+ }
}
}
@@ -2889,7 +2906,7 @@ public ForkJoinTask submit(ForkJoinTask task) {
*/
@Override
public ForkJoinTask submit(Callable task) {
- return poolSubmit(true, new ForkJoinTask.AdaptedCallable(task));
+ return poolSubmit(true, new ForkJoinTask.AdaptedInterruptibleCallable(task));
}
/**
@@ -3008,13 +3025,12 @@ public List> invokeAll(Collection extends Callable> tasks) {
futures.add(f);
poolSubmit(true, f);
}
- for (int i = futures.size() - 1; i >= 0; --i)
+ for (int i = futures.size() - 1; i >= 0 && runState >= 0; --i)
((ForkJoinTask>)futures.get(i)).quietlyJoin();
return futures;
- } catch (Throwable t) {
+ } finally {
for (Future e : futures)
ForkJoinTask.cancelIgnoringExceptions(e);
- throw t;
}
}
@@ -3033,7 +3049,7 @@ public List> invokeAll(Collection extends Callable> tasks,
}
long startTime = System.nanoTime(), ns = nanos;
boolean timedOut = (ns < 0L);
- for (int i = futures.size() - 1; i >= 0; --i) {
+ for (int i = futures.size() - 1; i >= 0 && runState >= 0; --i) {
ForkJoinTask f = (ForkJoinTask)futures.get(i);
if (!f.isDone()) {
if (!timedOut)
@@ -3045,10 +3061,9 @@ public List> invokeAll(Collection extends Callable> tasks,
}
}
return futures;
- } catch (Throwable t) {
+ } finally {
for (Future e : futures)
ForkJoinTask.cancelIgnoringExceptions(e);
- throw t;
}
}
@@ -3062,41 +3077,38 @@ static final class InvokeAnyRoot extends ForkJoinTask {
volatile E result;
final AtomicInteger count; // in case all fail
@SuppressWarnings("serial")
- final ForkJoinPool pool; // to check shutdown while collecting
- InvokeAnyRoot(int n, ForkJoinPool p) {
- pool = p;
+ InvokeAnyRoot(int n) {
count = new AtomicInteger(n);
}
- final void tryComplete(E v, Throwable ex, boolean fail) {
+ final boolean checkDone() {
+ ForkJoinPool p; // force done if caller's pool terminating
+ if (!isDone()) {
+ if ((p = getPool()) == null || p.runState >= 0)
+ return false;
+ cancel(false);
+ }
+ return true;
+ }
+ final void tryComplete(E v, Throwable ex, boolean completed) {
if (!isDone()) {
- if (!fail) {
+ if (completed) {
result = v;
quietlyComplete();
}
- else if (pool.runState < 0 || count.getAndDecrement() <= 1)
- trySetThrown(ex != null? ex : new CancellationException());
- }
- }
- final boolean checkDone() {
- if (isDone())
- return true;
- else if (pool.runState >= 0)
- return false;
- else {
- tryComplete(null, null, true);
- return true;
+ else if (count.getAndDecrement() <= 1)
+ trySetThrown(ex != null ? ex : new CancellationException());
}
}
- public final boolean exec() { return false; }
+ public final boolean exec() { return false; } // never forked
public final E getRawResult() { return result; }
public final void setRawResult(E v) { result = v; }
}
/**
* Variant of AdaptedInterruptibleCallable with results in
- * InvokeAnyRoot (and never independently joined). Task
- * cancellation status is used to avoid multiple calls to
- * root.tryComplete by the same task under async cancellation.
+ * InvokeAnyRoot (and never independently joined). Cancellation
+ * status is used to avoid multiple calls to tryComplete by the
+ * same task under async cancellation.
*/
static final class InvokeAnyTask extends ForkJoinTask {
private static final long serialVersionUID = 2838392045355241008L;
@@ -3109,36 +3121,42 @@ static final class InvokeAnyTask extends ForkJoinTask {
this.callable = callable;
}
public final boolean exec() {
- InvokeAnyRoot r; Callable c;
Thread.interrupted();
- if ((c = callable) != null && (r = root) != null && !r.checkDone()) {
- runner = Thread.currentThread();
- E v = null;
- Throwable ex = null;
- boolean fail = false;
+ runner = Thread.currentThread();
+ InvokeAnyRoot r = root;
+ Callable c = callable;
+ E v = null;
+ Throwable ex = null;
+ boolean completed = false;
+ if (r != null && !r.checkDone() && !isDone()) {
try {
- v = c.call();
+ if (c != null) {
+ v = c.call();
+ completed = true;
+ }
} catch (Throwable rex) {
ex = rex;
- fail = true;
}
- runner = null;
- if (trySetCancelled() >= 0) // else lost to async cancel
- r.tryComplete(v, ex, fail);
+ if (trySetCancelled() >= 0)
+ r.tryComplete(v, ex, completed);
}
+ runner = null;
return true;
}
public final boolean cancel(boolean mayInterruptIfRunning) {
- Thread t; InvokeAnyRoot r;
- if (trySetCancelled() >= 0 && (r = root) != null)
- r.tryComplete(null, null, true); // else lost race to cancel
- if (mayInterruptIfRunning && (t = runner) != null) {
- try {
- t.interrupt();
- } catch (Throwable ignore) {
+ int s; Thread t; InvokeAnyRoot r;
+ if ((s = trySetCancelled()) >= 0) {
+ if ((r = root) != null)
+ r.tryComplete(null, null, false);
+ if (mayInterruptIfRunning && (t = runner) != null) {
+ try {
+ t.interrupt();
+ } catch (Throwable ignore) {
+ }
}
+ return true;
}
- return isCancelled();
+ return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
}
public final void setRawResult(E v) {} // unused
public final E getRawResult() { return null; }
@@ -3150,7 +3168,7 @@ public T invokeAny(Collection extends Callable> tasks)
int n = tasks.size();
if (n <= 0)
throw new IllegalArgumentException();
- InvokeAnyRoot root = new InvokeAnyRoot(n, this);
+ InvokeAnyRoot root = new InvokeAnyRoot(n);
ArrayList> fs = new ArrayList<>(n);
try {
for (Callable c : tasks) {
@@ -3162,7 +3180,11 @@ public T invokeAny(Collection extends Callable> tasks)
if (root.isDone())
break;
}
- return root.get();
+ try {
+ return root.get();
+ } catch (CancellationException cx) {
+ throw new ExecutionException(cx);
+ }
} finally {
for (InvokeAnyTask f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
@@ -3177,7 +3199,7 @@ public T invokeAny(Collection extends Callable> tasks,
int n = tasks.size();
if (n <= 0)
throw new IllegalArgumentException();
- InvokeAnyRoot root = new InvokeAnyRoot(n, this);
+ InvokeAnyRoot root = new InvokeAnyRoot(n);
ArrayList> fs = new ArrayList<>(n);
try {
for (Callable c : tasks) {
@@ -3189,7 +3211,11 @@ public T invokeAny(Collection extends Callable> tasks,
if (root.isDone())
break;
}
- return root.get(nanos, TimeUnit.NANOSECONDS);
+ try {
+ return root.get(nanos, TimeUnit.NANOSECONDS);
+ } catch (CancellationException cx) {
+ throw new ExecutionException(cx);
+ }
} finally {
for (InvokeAnyTask f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
@@ -3514,6 +3540,7 @@ public List shutdownNow() {
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
+ // reduce false negatives during termination by helping
return (tryTerminate(false, false) & TERMINATED) != 0;
}
@@ -3531,7 +3558,7 @@ public boolean isTerminated() {
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
- return (runState & (STOP | TERMINATED)) == STOP;
+ return (tryTerminate(false, false) & (STOP | TERMINATED)) == STOP;
}
/**
@@ -3559,7 +3586,7 @@ public boolean isShutdown() {
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
- ReentrantLock lock; Condition cond = null;
+ ReentrantLock lock;
long nanos = unit.toNanos(timeout);
if ((config & ISCOMMON) != 0) {
if (helpQuiescePool(this, nanos, true) < 0)
@@ -3567,6 +3594,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit)
return false;
}
else if ((lock = registrationLock) != null) {
+ Condition cond = null;
while ((tryTerminate(false, false) & TERMINATED) == 0) {
if (nanos <= 0L)
return false;
@@ -3574,8 +3602,9 @@ else if ((lock = registrationLock) != null) {
try {
if (cond == null && (cond = termination) == null)
termination = cond = lock.newCondition();
- if ((runState & TERMINATED) == 0)
- nanos = cond.awaitNanos(nanos);
+ if ((runState & TERMINATED) != 0)
+ break;
+ nanos = cond.awaitNanos(nanos);
} finally {
lock.unlock();
}
@@ -3625,19 +3654,20 @@ public boolean awaitQuiescence(long timeout, TimeUnit unit) {
*/
@Override
public void close() {
- ReentrantLock lock = registrationLock;
- Condition cond = null;
+ ReentrantLock lock;
boolean interrupted = false;
- if (lock != null && (config & ISCOMMON) == 0 &&
- (runState & TERMINATED) == 0) {
+ if ((runState & TERMINATED) == 0 && (config & ISCOMMON) == 0 &&
+ (lock = registrationLock) != null) {
checkPermission();
+ Condition cond = null;
while ((tryTerminate(interrupted, true) & TERMINATED) == 0) {
lock.lock();
try {
if (cond == null && (cond = termination) == null)
termination = cond = lock.newCondition();
- if ((runState & TERMINATED) == 0)
- cond.await();
+ if ((runState & TERMINATED) != 0)
+ break;
+ cond.await();
} catch (InterruptedException ex) {
interrupted = true;
} finally {
@@ -3827,7 +3857,7 @@ protected RunnableFuture newTaskFor(Runnable runnable, T value) {
@Override
protected RunnableFuture newTaskFor(Callable callable) {
- return new ForkJoinTask.AdaptedCallable(callable);
+ return new ForkJoinTask.AdaptedInterruptibleCallable(callable);
}
static {
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index e238850359bbc..1897572ed4124 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -413,7 +413,9 @@ private int awaitDone(int how, long deadline) {
q = (wt = (ForkJoinWorkerThread)t).workQueue;
p = wt.pool;
}
- else if ((p = ForkJoinPool.common) != null && (how & POOLSUBMIT) == 0)
+ else if ((how & POOLSUBMIT) != 0)
+ p = null;
+ else if ((p = ForkJoinPool.common) != null)
q = p.externalQueue();
if (q != null && p != null) { // try helping
if (this instanceof CountedCompleter)
@@ -433,6 +435,8 @@ else if ((how & RAN) != 0 ||
Aux a;
if ((s = status) < 0)
break;
+ else if (p != null && p.runState < 0)
+ cancelIgnoringExceptions(this); // cancel on shutdown
else if (node == null)
node = new Aux(Thread.currentThread(), null);
else if (!queued) {
@@ -446,9 +450,7 @@ else if (timed && (ns = deadline - System.nanoTime()) <= 0) {
}
else if (Thread.interrupted()) {
interrupted = true;
- if ((how & POOLSUBMIT) != 0 && p != null && p.runState < 0)
- cancelIgnoringExceptions(this); // cancel on shutdown
- else if ((how & INTERRUPTIBLE) != 0) {
+ if ((how & INTERRUPTIBLE) != 0) {
s = ABNORMAL;
break;
}
@@ -852,8 +854,8 @@ public static > Collection invokeAll(Collection
* @return {@code true} if this task is now cancelled
*/
public boolean cancel(boolean mayInterruptIfRunning) {
- trySetCancelled();
- return isCancelled();
+ int s = trySetCancelled();
+ return (s >= 0 || (s & (ABNORMAL | THROWN)) == ABNORMAL);
}
public final boolean isDone() {
@@ -1483,10 +1485,13 @@ static final class AdaptedInterruptibleCallable extends ForkJoinTask
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
public final boolean exec() {
+ ForkJoinPool p; // for termination check
Thread.interrupted();
runner = Thread.currentThread();
try {
- if (!isDone()) // recheck
+ if ((p = getPool()) != null && p.runState < 0)
+ trySetCancelled();
+ else if (!isDone())
result = callable.call();
return true;
} catch (RuntimeException rex) {
@@ -1495,20 +1500,21 @@ public final boolean exec() {
throw new RuntimeException(ex);
} finally {
runner = null;
- Thread.interrupted();
}
}
public final void run() { invoke(); }
public final boolean cancel(boolean mayInterruptIfRunning) {
- Thread t;
- boolean stat = super.cancel(false);
- if (mayInterruptIfRunning && (t = runner) != null) {
- try {
- t.interrupt();
- } catch (Throwable ignore) {
+ int s; Thread t;
+ if ((s = trySetCancelled()) >= 0) {
+ if (mayInterruptIfRunning && (t = runner) != null) {
+ try {
+ t.interrupt();
+ } catch (Throwable ignore) {
+ }
}
+ return true;
}
- return stat;
+ return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
}
public String toString() {
return super.toString() + "[Wrapped task = " + callable + "]";
From 9412a0ee25fd1f54983d2ce92d5320643d0fce62 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Thu, 5 Jan 2023 13:52:04 -0500
Subject: [PATCH 04/61] Make ExecutorService method exceptions conform to
others.
---
.../java/util/concurrent/ForkJoinPool.java | 204 ++++++++----------
.../java/util/concurrent/ForkJoinTask.java | 41 ++--
2 files changed, 112 insertions(+), 133 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 8645631a552d4..a343af9c6203c 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -582,18 +582,17 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* Shutdown and Termination. A call to shutdownNow invokes
* tryTerminate to atomically set a mode bit. The calling thread,
- * as well as every other worker thereafter terminating invokes
- * helpTerminate, which cancels queued tasks, reactivates idle
- * workers, and interrupts others (in addtion to deregistering
- * itself). Calls to non-abrupt shutdown() preface this by
- * checking canStop before triggering the "STOP" phase of
+ * as well as other workers thereafter terminating, and external
+ * callers checking termination helps cancel queued tasks,
+ * reactivate idle workers, and interrupt others (in addtion to
+ * deregistering itself). Calls to non-abrupt shutdown() preface
+ * this by checking canStop before triggering the "STOP" phase of
* termination. Like most eveything else, we try to speed up
* termination by balancing parallelism and contention (which may
* occur while trying to cancel tasks or unblock workers), by
* starting task scans at different indices, and by fanning out
* reactivation of idle workers. The main remaining cost is the
- * potential need to re-interrupt active workers with tasks that
- * swallow them.
+ * potential need to re-interrupt active workers.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
@@ -727,7 +726,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* (ForkJoinTask.cancel) independently from the interrupt status
* of threads running tasks. (See the public ForkJoinTask
* documentation for rationale.) Interrupts are issued internally
- * only in helpTerminate, when workers should be terminating and
+ * only in tryTerminate, when workers should be terminating and
* tasks should be cancelled anyway. By default, interrupts are
* cleared only when necessary to ensure that calls to
* LockSupport.park do not loop indefinitely (park returns
@@ -1093,8 +1092,8 @@ static boolean casSlotToNull(ForkJoinTask>[] a, int i,
return U.compareAndSetReference(a, ((long)i << ASHIFT) + ABASE,
c, null);
}
- final void forcePhaseActive() { // clear sign bit
- U.getAndBitwiseAndInt(this, PHASE, 0x7fffffff);
+ final int forcePhaseActive() { // clear sign bit
+ return U.getAndBitwiseAndInt(this, PHASE, 0x7fffffff);
}
final int getAndSetAccess(int v) {
return U.getAndSetInt(this, ACCESS, v);
@@ -1453,11 +1452,6 @@ else if (a[nk] == null)
// misc
- final void cancelAllTasks() {
- for (ForkJoinTask> t; (t = poll(null)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
- }
-
/**
* Returns true if owned by a worker thread and not known to be blocked.
*/
@@ -1669,12 +1663,8 @@ final void registerWorker(WorkQueue w) {
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
- int cfg = 0;
WorkQueue w = (wt == null) ? null : wt.workQueue;
- if (w != null) {
- cfg = w.config;
- w.access = STOP; // may be redundant
- }
+ int cfg = (w == null) ? 0 : w.config;
long c = ctl;
if ((cfg & TRIMMED) == 0) // decrement counts
do {} while (c != (c = compareAndExchangeCtl(
@@ -1694,12 +1684,17 @@ else if ((int)c == 0) // was dropped on timeout
stealCount += ns; // accumulate steals
lock.unlock();
}
- w.cancelAllTasks();
if ((cfg & SRC) != 0)
signalWork(); // possibly replace worker
}
- if (ex != null)
+ if (ex != null) {
+ if (w != null) {
+ w.access = STOP; // cancel tasks
+ for (ForkJoinTask> t; (t = w.nextLocalTask(0)) != null; )
+ ForkJoinTask.cancelIgnoringExceptions(t);
+ }
ForkJoinTask.rethrow(ex);
+ }
}
/*
@@ -1824,6 +1819,7 @@ final void runWorker(WorkQueue w) {
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
} while ((src = scan(w, src, r)) >= 0 ||
(src = awaitWork(w)) == 0);
+ w.access = STOP; // record normal termination
}
}
@@ -1839,9 +1835,8 @@ final void runWorker(WorkQueue w) {
* @return the next prevSrc value to use, or negative if none found
*/
private int scan(WorkQueue w, int prevSrc, int r) {
- int rs = runState;
WorkQueue[] qs = queues;
- int n = (rs < 0 || qs == null || w == null) ? 0 : qs.length;
+ int n = (w == null || qs == null) ? 0 : qs.length;
for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) {
int j, cap; WorkQueue q; ForkJoinTask>[] a;
if ((q = qs[j = r & (n - 1)]) != null &&
@@ -1877,7 +1872,6 @@ private int awaitWork(WorkQueue w) {
if (w == null)
return -1; // currently impossible
int p = (w.phase + SS_SEQ) & ~INACTIVE; // advance phase
- boolean idle = false; // true if possibly quiescent
if (runState < 0)
return -1; // terminating
long sp = p & SP_MASK, pc = ctl, qc;
@@ -1886,13 +1880,11 @@ private int awaitWork(WorkQueue w) {
w.stackPred = (int)pc; // set ctl stack link
} while (pc != (pc = compareAndExchangeCtl(
pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
- if ((qc & RC_MASK) <= 0L) {
- if (hasTasks(true) && (w.phase >= 0 || reactivate() == w))
- return 0; // check for stragglers
- if (runState != 0 && tryTerminate(false, false) < 0)
- return -1; // quiescent termination
- idle = true;
- }
+ boolean idle;
+ if ((idle = ((qc & RC_MASK) <= 0L)) && // possibly quiescent
+ (hasTasks(true) || (runState != 0 && canStop(true))) &&
+ w.phase < 0)
+ reactivate(); // enable rescan or termination
WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) | 0xf;
while ((p = w.phase) < 0 && --spins > 0)
@@ -1924,21 +1916,25 @@ private int awaitWork(WorkQueue w) {
}
}
}
- return 0;
+ return runState & STOP; // negative if terminating
}
/**
* Non-overridable version of isQuiescent. Returns true if
* quiescent or already terminating.
*/
- private boolean canStop() {
- long c = ctl;
- do {
+ private boolean canStop(boolean enableTermination) {
+ for (long c = ctl;;) {
if (runState < 0)
break;
if ((c & RC_MASK) > 0L || hasTasks(false))
return false;
- } while (c != (c = ctl)); // validate
+ if (c == (c = ctl)) { // validate
+ if (enableTermination) // transition now
+ getAndBitwiseOrRunState(STOP);
+ break;
+ }
+ }
return true;
}
@@ -2276,7 +2272,7 @@ private int externalHelpQuiesce(long nanos, boolean interruptible) {
t.doExec();
parkTime = 0L;
}
- else if (canStop())
+ else if (canStop(false))
return 1;
else if (parkTime == 0L) {
parkTime = 1L << 10;
@@ -2493,7 +2489,10 @@ static int getSurplusQueuedTaskCount() {
// Termination
/**
- * Possibly initiates and/or completes pool termination.
+ * Possibly initiates and/or completes pool termination. If
+ * terminating, helps complete termination by cancelling tasks,
+ * reactivating idle workers or interrupting active workers, and
+ * possibly triggering TERMINATED state.
*
* @param now if true, unconditionally terminate, else only
* if no work and no active workers
@@ -2502,71 +2501,50 @@ static int getSurplusQueuedTaskCount() {
*/
private int tryTerminate(boolean now, boolean enable) {
int rs = runState;
- if ((config & ISCOMMON) == 0) {
- if (rs >= 0) {
- if (!now && enable && (rs & SHUTDOWN) == 0)
- rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
- if (now || ((rs & SHUTDOWN) != 0 && canStop()))
- rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
- }
- if ((rs < 0 || (rs = runState) < 0) && (rs & TERMINATED) == 0) {
- helpTerminate();
+ if ((config & ISCOMMON) == 0 && rs >= 0) {
+ if (!now && enable && (rs & SHUTDOWN) == 0)
+ rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
+ if (now || ((rs & SHUTDOWN) != 0 && canStop(false)))
+ rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
+ else
rs = runState;
- }
}
- return rs;
- }
-
- /**
- * Helps complete termination by cancelling tasks, unblocking
- * workers and possibly triggering TERMINATED state.
- */
- private void helpTerminate() {
- int r = 1; // for queue traversal
+ if ((rs & (STOP | TERMINATED)) != STOP)
+ return rs; // not terminating
+ WorkQueue[] qs; WorkQueue q; Thread thread; WorkQueue w;
+ ReentrantLock lock; Condition cond;
+ boolean more = (reactivate() != null); // try activating idle worker
Thread current = Thread.currentThread();
- if (current instanceof ForkJoinWorkerThread) {
- ForkJoinWorkerThread wt = (ForkJoinWorkerThread)current;
- WorkQueue w = wt.workQueue;
- if (wt.pool == this && w != null) {
- r = w.config; // stagger traversals
- w.access = STOP; // may be redundant
- }
- }
- WorkQueue[] qs; WorkQueue q; Thread thread;
+ int r = (((current instanceof ForkJoinWorkerThread) &&
+ (w = ((ForkJoinWorkerThread)current).workQueue) != null) ?
+ w.config : 0); // stagger traversals
int n = ((qs = queues) == null) ? 0 : qs.length;
- for (int i = 0; i < n; ++i) { // help cancel tasks
- if ((q = qs[(r + i) & (n - 1)]) != null)
- q.cancelAllTasks();
- }
- if (reactivate() != null) // activate <= 2 idle workers
- reactivate();
- else if ((runState & TERMINATED) == 0) { // or unblock active workers
- boolean canTerminate = true;
- n = ((qs = queues) == null) ? 0 : qs.length;
- for (int i = 0; i < n; i += 2) {
- if ((q = qs[(r + i) & (n - 1)]) != null &&
- (thread = q.owner) != null && q.access != STOP) {
- canTerminate = false;
- q.forcePhaseActive(); // for awaitWork
- if (!thread.isInterrupted()) {
- try {
- thread.interrupt();
- } catch (Throwable ignore) {
- }
+ for (int i = 0; i < n; ++i) {
+ if ((q = qs[(r + i) & (n - 1)]) != null) {
+ for (ForkJoinTask> t; (t = q.poll(null)) != null; )
+ ForkJoinTask.cancelIgnoringExceptions(t);
+ if (!more && q.access != STOP &&
+ (thread = q.owner) != null && thread != current &&
+ (q.forcePhaseActive() < 0 || !thread.isInterrupted())) {
+ try {
+ thread.interrupt(); // help unblock others
+ } catch (Throwable ignore) {
}
}
}
- ReentrantLock lock; Condition cond; // signal when no workers
- if (canTerminate && (short)(ctl >>> TC_SHIFT) <= 0 &&
- (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
- (lock = registrationLock) != null) {
- lock.lock();
- if ((cond = termination) != null)
- cond.signalAll();
- lock.unlock();
- container.close();
- }
}
+ if (reactivate() == null && // transition if no workers
+ (short)(ctl >>> TC_SHIFT) <= 0 &&
+ (runState & TERMINATED) == 0 &&
+ (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
+ (lock = registrationLock) != null) {
+ lock.lock();
+ if ((cond = termination) != null)
+ cond.signalAll();
+ lock.unlock();
+ container.close();
+ }
+ return runState;
}
// Exported methods
@@ -3025,12 +3003,13 @@ public List> invokeAll(Collection extends Callable> tasks) {
futures.add(f);
poolSubmit(true, f);
}
- for (int i = futures.size() - 1; i >= 0 && runState >= 0; --i)
+ for (int i = futures.size() - 1; i >= 0; --i)
((ForkJoinTask>)futures.get(i)).quietlyJoin();
return futures;
- } finally {
+ } catch (Throwable t) {
for (Future e : futures)
ForkJoinTask.cancelIgnoringExceptions(e);
+ throw t;
}
}
@@ -3049,7 +3028,7 @@ public List> invokeAll(Collection extends Callable> tasks,
}
long startTime = System.nanoTime(), ns = nanos;
boolean timedOut = (ns < 0L);
- for (int i = futures.size() - 1; i >= 0 && runState >= 0; --i) {
+ for (int i = futures.size() - 1; i >= 0; --i) {
ForkJoinTask f = (ForkJoinTask)futures.get(i);
if (!f.isDone()) {
if (!timedOut)
@@ -3061,9 +3040,10 @@ public List> invokeAll(Collection extends Callable> tasks,
}
}
return futures;
- } finally {
+ } catch (Throwable t) {
for (Future e : futures)
ForkJoinTask.cancelIgnoringExceptions(e);
+ throw t;
}
}
@@ -3095,8 +3075,12 @@ final void tryComplete(E v, Throwable ex, boolean completed) {
result = v;
quietlyComplete();
}
- else if (count.getAndDecrement() <= 1)
- trySetThrown(ex != null ? ex : new CancellationException());
+ else if (count.getAndDecrement() <= 1) {
+ if (ex == null)
+ cancel(false);
+ else
+ trySetThrown(ex);
+ }
}
}
public final boolean exec() { return false; } // never forked
@@ -3180,11 +3164,7 @@ public T invokeAny(Collection extends Callable> tasks)
if (root.isDone())
break;
}
- try {
- return root.get();
- } catch (CancellationException cx) {
- throw new ExecutionException(cx);
- }
+ return root.get();
} finally {
for (InvokeAnyTask f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
@@ -3211,11 +3191,7 @@ public T invokeAny(Collection extends Callable> tasks,
if (root.isDone())
break;
}
- try {
- return root.get(nanos, TimeUnit.NANOSECONDS);
- } catch (CancellationException cx) {
- throw new ExecutionException(cx);
- }
+ return root.get(nanos, TimeUnit.NANOSECONDS);
} finally {
for (InvokeAnyTask f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
@@ -3325,7 +3301,7 @@ public int getActiveThreadCount() {
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return canStop();
+ return canStop(false);
}
/**
@@ -3540,7 +3516,7 @@ public List shutdownNow() {
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
- // reduce false negatives during termination by helping
+ // reduce premature negatives during termination by helping
return (tryTerminate(false, false) & TERMINATED) != 0;
}
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index 1897572ed4124..a357816109799 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -448,19 +448,20 @@ else if (timed && (ns = deadline - System.nanoTime()) <= 0) {
s = 0;
break;
}
- else if (Thread.interrupted()) {
- interrupted = true;
- if ((how & INTERRUPTIBLE) != 0) {
+ else {
+ if (Thread.interrupted())
+ interrupted = true;
+ if ((s = status) < 0) // prefer result to IE
+ break;
+ else if (interrupted && (how & INTERRUPTIBLE) != 0) {
s = ABNORMAL;
break;
}
+ else if (timed)
+ LockSupport.parkNanos(ns);
+ else
+ LockSupport.park();
}
- else if ((s = status) < 0) // recheck
- break;
- else if (timed)
- LockSupport.parkNanos(ns);
- else
- LockSupport.park();
}
if (uncompensate)
p.uncompensate();
@@ -574,13 +575,17 @@ private void reportException(int s) {
* necessary in an ExecutionException.
*/
private void reportExecutionException(int s) {
- Throwable ex = null, rx;
+ Throwable ex, rx;
if (s == ABNORMAL)
ex = new InterruptedException();
else if (s >= 0)
ex = new TimeoutException();
else if ((rx = getThrowableException()) != null)
ex = new ExecutionException(rx);
+ else if ((status & POOLSUBMIT) != 0)
+ ex = new ExecutionException(new CancellationException());
+ else
+ ex = new CancellationException();
ForkJoinTask.uncheckedThrow(ex);
}
@@ -1504,17 +1509,15 @@ else if (!isDone())
}
public final void run() { invoke(); }
public final boolean cancel(boolean mayInterruptIfRunning) {
- int s; Thread t;
- if ((s = trySetCancelled()) >= 0) {
- if (mayInterruptIfRunning && (t = runner) != null) {
- try {
- t.interrupt();
- } catch (Throwable ignore) {
- }
+ Thread t;
+ boolean stat = super.cancel(false);
+ if (mayInterruptIfRunning && (t = runner) != null) {
+ try {
+ t.interrupt();
+ } catch (Throwable ignore) {
}
- return true;
}
- return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
+ return stat;
}
public String toString() {
return super.toString() + "[Wrapped task = " + callable + "]";
From 6c7690805bb766fc7f72bb7f2a4df0ca3267cdd5 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Sat, 7 Jan 2023 09:31:38 -0500
Subject: [PATCH 05/61] More exception compatibility
---
.../java/util/concurrent/ForkJoinPool.java | 121 ++++++++++--------
.../java/util/concurrent/ForkJoinTask.java | 21 +--
2 files changed, 84 insertions(+), 58 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index a343af9c6203c..347a6ab566751 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -581,18 +581,22 @@ public class ForkJoinPool extends AbstractExecutorService {
* occasionally adds some extra unnecessary processing.
*
* Shutdown and Termination. A call to shutdownNow invokes
- * tryTerminate to atomically set a mode bit. The calling thread,
- * as well as other workers thereafter terminating, and external
- * callers checking termination helps cancel queued tasks,
- * reactivate idle workers, and interrupt others (in addtion to
- * deregistering itself). Calls to non-abrupt shutdown() preface
- * this by checking canStop before triggering the "STOP" phase of
- * termination. Like most eveything else, we try to speed up
- * termination by balancing parallelism and contention (which may
- * occur while trying to cancel tasks or unblock workers), by
- * starting task scans at different indices, and by fanning out
- * reactivation of idle workers. The main remaining cost is the
- * potential need to re-interrupt active workers.
+ * tryTerminate to atomically set a mode bit. However, termination
+ * is intrinsically non-atomic. The calling thread, as well as
+ * other workers thereafter terminating, and external callers
+ * checking termination help cancel queued tasks, reactivate idle
+ * workers, and interrupt others. To support this, worker
+ * references not removed from the queues array during
+ * termination. It is possible for late thread creations to still
+ * be in progress after a quiescent termination reports terminated
+ * status, but they will also immediately terminate. Calls to
+ * non-abrupt shutdown() preface this by checking canStop before
+ * triggering the "STOP" phase of termination. Like most eveything
+ * else, we try to speed up termination by balancing parallelism
+ * and contention, by starting task scans at different indices,
+ * and by fanning out reactivation of idle workers. The main
+ * remaining cost is the potential need to re-interrupt active
+ * workers (checks for this are racy but suffice here).
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
@@ -733,26 +737,26 @@ public class ForkJoinPool extends AbstractExecutorService {
* immediately if the current thread is interrupted).
*
* To conform to specs and expectations surrounding execution,
- * cancellation and termination of ExecutorService invoke, submit,
- * invokeAll, and invokeAny methods (without penalizing others) we
- * use AdaptedInterruptibleCallables in ExecutorService methods
- * accepting Callables (actually, for InvokeAny, a variant that
- * gathers a single result). We also ensure that external
+ * cancellation and termination of Callable-based ExecutorService
+ * invoke, submit, invokeAll, and invokeAny methods (without
+ * penalizing others) we use AdaptedInterruptibleCallables in
+ * ExecutorService methods (actually, for InvokeAny, a variant
+ * that gathers a single result). We also ensure that external
* submitters do not help run such tasks by recording
* ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag
* argument to joining methods. External callers of task.get etc
* are not directly interrupted on shutdown, but must be woken due
* to the task itself being cancelled during termination.
- * Interruptible tasks always clear interrupts and check for
- * termination and cancellation before invoking user-supplied
- * Callables These rechecks are needed because the cleared
- * interrupt could be due to cancellation, termination, or any
- * other reason. These tasks also record their runner thread so
- * that cancel(true) can interrupt them later. As is the case with
- * any kind of pool, it is possible that an interrupt designed to
- * cancel one task occurs both late and unnecessarily, so instead
- * "spuriously" interrupts the thread while performing a later
- * task.
+ * AdaptedInterruptibleCallables tasks always clear interrupts and
+ * check for termination and cancellation before invoking
+ * user-supplied Callables. The rechecks are needed because the
+ * cleared interrupt could be due to cancellation, termination, or
+ * any other reason. These tasks also record their runner thread
+ * so that cancel(true) can interrupt them later. As is the case
+ * with any kind of pool, it is possible that an interrupt
+ * designed to cancel one task occurs both late and unnecessarily,
+ * so instead "spuriously" interrupts the thread while performing
+ * a later task.
*
* Memory placement
* ================
@@ -1663,8 +1667,12 @@ final void registerWorker(WorkQueue w) {
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
+ int cfg = 0;
WorkQueue w = (wt == null) ? null : wt.workQueue;
- int cfg = (w == null) ? 0 : w.config;
+ if (w != null) {
+ cfg = w.config;
+ w.access = STOP; // may be redundant
+ }
long c = ctl;
if ((cfg & TRIMMED) == 0) // decrement counts
do {} while (c != (c = compareAndExchangeCtl(
@@ -1688,8 +1696,7 @@ else if ((int)c == 0) // was dropped on timeout
signalWork(); // possibly replace worker
}
if (ex != null) {
- if (w != null) {
- w.access = STOP; // cancel tasks
+ if (w != null) { // cancel tasks
for (ForkJoinTask> t; (t = w.nextLocalTask(0)) != null; )
ForkJoinTask.cancelIgnoringExceptions(t);
}
@@ -1819,7 +1826,6 @@ final void runWorker(WorkQueue w) {
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
} while ((src = scan(w, src, r)) >= 0 ||
(src = awaitWork(w)) == 0);
- w.access = STOP; // record normal termination
}
}
@@ -1882,7 +1888,7 @@ private int awaitWork(WorkQueue w) {
pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
boolean idle;
if ((idle = ((qc & RC_MASK) <= 0L)) && // possibly quiescent
- (hasTasks(true) || (runState != 0 && canStop(true))) &&
+ (hasTasks(true) || (runState != 0 && canStop(true) < 0)) &&
w.phase < 0)
reactivate(); // enable rescan or termination
WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
@@ -1916,26 +1922,27 @@ private int awaitWork(WorkQueue w) {
}
}
}
- return runState & STOP; // negative if terminating
+ return runState & STOP; // 0 unless terminating
}
/**
- * Non-overridable version of isQuiescent. Returns true if
- * quiescent or already terminating.
+ * Non-overridable version of isQuiescent
+ * @param enableTransition true if should set runState if can stop
+ * @return negative if quiescent or already terminating, else runState
*/
- private boolean canStop(boolean enableTermination) {
+ private int canStop(boolean enableTransition) {
+ int rs;
for (long c = ctl;;) {
- if (runState < 0)
+ if ((rs = runState) < 0 || ((c & RC_MASK) > 0L || hasTasks(false)))
break;
- if ((c & RC_MASK) > 0L || hasTasks(false))
- return false;
if (c == (c = ctl)) { // validate
- if (enableTermination) // transition now
+ if (enableTransition)
getAndBitwiseOrRunState(STOP);
+ rs |= STOP;
break;
}
}
- return true;
+ return rs;
}
/**
@@ -2272,7 +2279,7 @@ private int externalHelpQuiesce(long nanos, boolean interruptible) {
t.doExec();
parkTime = 0L;
}
- else if (canStop(false))
+ else if (canStop(false) < 0)
return 1;
else if (parkTime == 0L) {
parkTime = 1L << 10;
@@ -2502,12 +2509,14 @@ static int getSurplusQueuedTaskCount() {
private int tryTerminate(boolean now, boolean enable) {
int rs = runState;
if ((config & ISCOMMON) == 0 && rs >= 0) {
- if (!now && enable && (rs & SHUTDOWN) == 0)
- rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
- if (now || ((rs & SHUTDOWN) != 0 && canStop(false)))
- rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
+ if (!now) {
+ if (enable && (rs & SHUTDOWN) == 0)
+ rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
+ if ((rs & SHUTDOWN) != 0)
+ rs = canStop(true);
+ }
else
- rs = runState;
+ rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
}
if ((rs & (STOP | TERMINATED)) != STOP)
return rs; // not terminating
@@ -3050,6 +3059,10 @@ public List> invokeAll(Collection extends Callable> tasks,
/**
* Task to hold results for invokeAny, or to report exception if
* all subtasks fail or are cancelled or the pool is terminating.
+ * Note: Among other oddities, the ExecutorService.invokeAny spec
+ * requires that the all-cancelled case (including cancellation of
+ * the root because of pool termination) be reported as an
+ * ExecutionException, not a CancellationException.
*/
static final class InvokeAnyRoot extends ForkJoinTask {
private static final long serialVersionUID = 2838392045355241008L;
@@ -3164,7 +3177,11 @@ public T invokeAny(Collection extends Callable> tasks)
if (root.isDone())
break;
}
- return root.get();
+ try {
+ return root.get();
+ } catch (CancellationException cx) {
+ throw new ExecutionException(cx);
+ }
} finally {
for (InvokeAnyTask f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
@@ -3191,7 +3208,11 @@ public T invokeAny(Collection extends Callable> tasks,
if (root.isDone())
break;
}
- return root.get(nanos, TimeUnit.NANOSECONDS);
+ try {
+ return root.get(nanos, TimeUnit.NANOSECONDS);
+ } catch (CancellationException cx) {
+ throw new ExecutionException(cx);
+ }
} finally {
for (InvokeAnyTask f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
@@ -3301,7 +3322,7 @@ public int getActiveThreadCount() {
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return canStop(false);
+ return canStop(false) < 0;
}
/**
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index a357816109799..8c2c9ae027f77 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -575,17 +575,13 @@ private void reportException(int s) {
* necessary in an ExecutionException.
*/
private void reportExecutionException(int s) {
- Throwable ex, rx;
+ Throwable ex = null, rx;
if (s == ABNORMAL)
ex = new InterruptedException();
else if (s >= 0)
ex = new TimeoutException();
else if ((rx = getThrowableException()) != null)
ex = new ExecutionException(rx);
- else if ((status & POOLSUBMIT) != 0)
- ex = new ExecutionException(new CancellationException());
- else
- ex = new CancellationException();
ForkJoinTask.uncheckedThrow(ex);
}
@@ -1476,6 +1472,15 @@ public String toString() {
private static final long serialVersionUID = 2838392045355241008L;
}
+ /**
+ * Adapter for Callable-based tasks that are designed to be
+ * interruptible when cancelled, including cases of cancellation
+ * upon pool termination. In addition to recording the running
+ * thread to enable interrupt in cancel(true), the task checks for
+ * termination before executing the compute method, to cover
+ * shutdown races in which the task has not yet been cancelled on
+ * entry but might not otherwise be re-interrupted by others.
+ */
static final class AdaptedInterruptibleCallable extends ForkJoinTask
implements RunnableFuture {
@SuppressWarnings("serial") // Conditionally serializable
@@ -1490,12 +1495,12 @@ static final class AdaptedInterruptibleCallable extends ForkJoinTask
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
public final boolean exec() {
- ForkJoinPool p; // for termination check
Thread.interrupted();
runner = Thread.currentThread();
+ ForkJoinPool p = getPool();
try {
- if ((p = getPool()) != null && p.runState < 0)
- trySetCancelled();
+ if (p != null && p.runState < 0)
+ trySetCancelled(); // termination check
else if (!isDone())
result = callable.call();
return true;
From 6d1578e91e77f59652891d686e3d019bbaeff687 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Sun, 8 Jan 2023 09:37:58 -0500
Subject: [PATCH 06/61] Fix more exception policy mismatches
---
.../java/util/concurrent/ForkJoinPool.java | 31 +++++++------
.../java/util/concurrent/ForkJoinTask.java | 43 ++++++++++---------
2 files changed, 38 insertions(+), 36 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 347a6ab566751..854bfd18cd388 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -3073,15 +3073,6 @@ static final class InvokeAnyRoot extends ForkJoinTask {
InvokeAnyRoot(int n) {
count = new AtomicInteger(n);
}
- final boolean checkDone() {
- ForkJoinPool p; // force done if caller's pool terminating
- if (!isDone()) {
- if ((p = getPool()) == null || p.runState >= 0)
- return false;
- cancel(false);
- }
- return true;
- }
final void tryComplete(E v, Throwable ex, boolean completed) {
if (!isDone()) {
if (completed) {
@@ -3090,7 +3081,7 @@ final void tryComplete(E v, Throwable ex, boolean completed) {
}
else if (count.getAndDecrement() <= 1) {
if (ex == null)
- cancel(false);
+ trySetCancelled();
else
trySetThrown(ex);
}
@@ -3118,21 +3109,29 @@ static final class InvokeAnyTask extends ForkJoinTask {
this.callable = callable;
}
public final boolean exec() {
+ ForkJoinPool p;
+ Thread t = Thread.currentThread();
Thread.interrupted();
- runner = Thread.currentThread();
+ runner = t;
InvokeAnyRoot r = root;
Callable c = callable;
E v = null;
Throwable ex = null;
boolean completed = false;
- if (r != null && !r.checkDone() && !isDone()) {
- try {
- if (c != null) {
+ if (r != null && c != null) {
+ if ((t instanceof ForkJoinWorkerThread) &&
+ (p = ((ForkJoinWorkerThread) t).pool) != null &&
+ p.runState < 0) { // termination check
+ r.trySetCancelled();
+ t.interrupt(); // restore interrupt
+ }
+ else if (!r.isDone() && !isDone()) {
+ try {
v = c.call();
completed = true;
+ } catch (Throwable rex) {
+ ex = rex;
}
- } catch (Throwable rex) {
- ex = rex;
}
if (trySetCancelled() >= 0)
r.tryComplete(v, ex, completed);
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index 8c2c9ae027f77..3948068b39d28 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -413,9 +413,7 @@ private int awaitDone(int how, long deadline) {
q = (wt = (ForkJoinWorkerThread)t).workQueue;
p = wt.pool;
}
- else if ((how & POOLSUBMIT) != 0)
- p = null;
- else if ((p = ForkJoinPool.common) != null)
+ else if ((p = ForkJoinPool.common) != null && (how & POOLSUBMIT) == 0)
q = p.externalQueue();
if (q != null && p != null) { // try helping
if (this instanceof CountedCompleter)
@@ -435,8 +433,6 @@ else if ((how & RAN) != 0 ||
Aux a;
if ((s = status) < 0)
break;
- else if (p != null && p.runState < 0)
- cancelIgnoringExceptions(this); // cancel on shutdown
else if (node == null)
node = new Aux(Thread.currentThread(), null);
else if (!queued) {
@@ -448,20 +444,21 @@ else if (timed && (ns = deadline - System.nanoTime()) <= 0) {
s = 0;
break;
}
- else {
- if (Thread.interrupted())
- interrupted = true;
- if ((s = status) < 0) // prefer result to IE
- break;
- else if (interrupted && (how & INTERRUPTIBLE) != 0) {
+ else if (Thread.interrupted()) {
+ interrupted = true;
+ if ((how & POOLSUBMIT) != 0 && p != null && p.runState < 0)
+ cancelIgnoringExceptions(this); // cancel on shutdown
+ else if ((how & INTERRUPTIBLE) != 0) {
s = ABNORMAL;
break;
}
- else if (timed)
- LockSupport.parkNanos(ns);
- else
- LockSupport.park();
}
+ else if ((s = status) < 0) // recheck
+ break;
+ else if (timed)
+ LockSupport.parkNanos(ns);
+ else
+ LockSupport.park();
}
if (uncompensate)
p.uncompensate();
@@ -485,8 +482,11 @@ else if (casAux(a, next))
}
}
}
+ int stat = status; // prefer completion result
+ if (stat < 0)
+ s = stat;
}
- else {
+ if (s < 0) {
signalWaiters(); // help clean or signal
if (interrupted)
Thread.currentThread().interrupt();
@@ -1495,12 +1495,15 @@ static final class AdaptedInterruptibleCallable extends ForkJoinTask
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
public final boolean exec() {
+ ForkJoinPool p;
+ Thread t = Thread.currentThread();
Thread.interrupted();
- runner = Thread.currentThread();
- ForkJoinPool p = getPool();
+ runner = t;
try {
- if (p != null && p.runState < 0)
- trySetCancelled(); // termination check
+ if ((t instanceof ForkJoinWorkerThread) &&
+ (p = ((ForkJoinWorkerThread) t).pool) != null &&
+ p.runState < 0) // termination check
+ cancelIgnoringExceptions(this);
else if (!isDone())
result = callable.call();
return true;
From c09461f2e7a7f2841cbcfc016a320075dd5a6536 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Wed, 11 Jan 2023 17:14:30 -0500
Subject: [PATCH 07/61] More exception policy fixes
---
.../java/util/concurrent/ForkJoinPool.java | 91 ++++++++-----------
.../java/util/concurrent/ForkJoinTask.java | 13 ++-
2 files changed, 49 insertions(+), 55 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 854bfd18cd388..7beff8f682d40 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1824,8 +1824,8 @@ final void runWorker(WorkQueue w) {
int r = w.stackPred, src = 0; // use seed from registerWorker
do {
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
- } while ((src = scan(w, src, r)) >= 0 ||
- (src = awaitWork(w)) == 0);
+ } while (runState >= 0 && ((src = scan(w, src, r)) >= 0 ||
+ (src = awaitWork(w)) == 0));
}
}
@@ -1922,7 +1922,7 @@ private int awaitWork(WorkQueue w) {
}
}
}
- return runState & STOP; // 0 unless terminating
+ return 0;
}
/**
@@ -3071,6 +3071,8 @@ static final class InvokeAnyRoot extends ForkJoinTask {
final AtomicInteger count; // in case all fail
@SuppressWarnings("serial")
InvokeAnyRoot(int n) {
+ if (n <= 0)
+ throw new IllegalArgumentException();
count = new AtomicInteger(n);
}
final void tryComplete(E v, Throwable ex, boolean completed) {
@@ -3090,6 +3092,32 @@ else if (count.getAndDecrement() <= 1) {
public final boolean exec() { return false; } // never forked
public final E getRawResult() { return result; }
public final void setRawResult(E v) { result = v; }
+ // Common support for timed and untimed versions of invokeAny
+ final E invokeAny(Collection extends Callable> tasks,
+ ForkJoinPool pool, boolean timed, long nanos)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ Thread t = Thread.currentThread();
+ if (!(t instanceof ForkJoinWorkerThread) ||
+ ((ForkJoinWorkerThread)t).pool != pool)
+ markPoolSubmission(); // for exception reporting
+ ArrayList> fs = new ArrayList<>(count.get());
+ try {
+ for (Callable c : tasks) {
+ InvokeAnyTask f = new InvokeAnyTask(this, c);
+ fs.add(f);
+ pool.poolSubmit(true, f);
+ if (isDone())
+ break;
+ }
+ if (timed)
+ return get(nanos, TimeUnit.NANOSECONDS);
+ else
+ return get();
+ } finally {
+ for (InvokeAnyTask f : fs)
+ ForkJoinTask.cancelIgnoringExceptions(f);
+ }
+ }
}
/**
@@ -3105,8 +3133,9 @@ static final class InvokeAnyTask extends ForkJoinTask {
final Callable callable;
transient volatile Thread runner;
InvokeAnyTask(InvokeAnyRoot root, Callable callable) {
- this.root = root;
+ if (callable == null) throw new NullPointerException();
this.callable = callable;
+ this.root = root;
}
public final boolean exec() {
ForkJoinPool p;
@@ -3161,29 +3190,12 @@ public final void setRawResult(E v) {} // unused
@Override
public T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException {
- int n = tasks.size();
- if (n <= 0)
- throw new IllegalArgumentException();
- InvokeAnyRoot root = new InvokeAnyRoot(n);
- ArrayList> fs = new ArrayList<>(n);
try {
- for (Callable c : tasks) {
- if (c == null)
- throw new NullPointerException();
- InvokeAnyTask f = new InvokeAnyTask(root, c);
- fs.add(f);
- poolSubmit(true, f);
- if (root.isDone())
- break;
- }
- try {
- return root.get();
- } catch (CancellationException cx) {
- throw new ExecutionException(cx);
- }
- } finally {
- for (InvokeAnyTask f : fs)
- ForkJoinTask.cancelIgnoringExceptions(f);
+ return new InvokeAnyRoot(tasks.size()).
+ invokeAny(tasks, this, false, 0L);
+ } catch (TimeoutException cannotHappen) {
+ assert false;
+ return null;
}
}
@@ -3191,31 +3203,8 @@ public T invokeAny(Collection extends Callable> tasks)
public T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- long nanos = unit.toNanos(timeout);
- int n = tasks.size();
- if (n <= 0)
- throw new IllegalArgumentException();
- InvokeAnyRoot root = new InvokeAnyRoot(n);
- ArrayList> fs = new ArrayList<>(n);
- try {
- for (Callable c : tasks) {
- if (c == null)
- throw new NullPointerException();
- InvokeAnyTask f = new InvokeAnyTask(root, c);
- fs.add(f);
- poolSubmit(true, f);
- if (root.isDone())
- break;
- }
- try {
- return root.get(nanos, TimeUnit.NANOSECONDS);
- } catch (CancellationException cx) {
- throw new ExecutionException(cx);
- }
- } finally {
- for (InvokeAnyTask f : fs)
- ForkJoinTask.cancelIgnoringExceptions(f);
- }
+ return new InvokeAnyRoot(tasks.size()).
+ invokeAny(tasks, this, true, unit.toNanos(timeout));
}
/**
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index 3948068b39d28..421f8c402c6d5 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -446,9 +446,7 @@ else if (timed && (ns = deadline - System.nanoTime()) <= 0) {
}
else if (Thread.interrupted()) {
interrupted = true;
- if ((how & POOLSUBMIT) != 0 && p != null && p.runState < 0)
- cancelIgnoringExceptions(this); // cancel on shutdown
- else if ((how & INTERRUPTIBLE) != 0) {
+ if ((how & INTERRUPTIBLE) != 0) {
s = ABNORMAL;
break;
}
@@ -575,13 +573,20 @@ private void reportException(int s) {
* necessary in an ExecutionException.
*/
private void reportExecutionException(int s) {
- Throwable ex = null, rx;
+ Throwable ex, rx;
if (s == ABNORMAL)
ex = new InterruptedException();
else if (s >= 0)
ex = new TimeoutException();
else if ((rx = getThrowableException()) != null)
ex = new ExecutionException(rx);
+ else {
+ rx = new CancellationException();
+ if ((status & POOLSUBMIT) != 0) // wrap if external
+ ex = new ExecutionException(rx);
+ else
+ ex = rx;
+ }
ForkJoinTask.uncheckedThrow(ex);
}
From 72b617771348134f4a751e25ddeb6d2ee97f1832 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Sat, 21 Jan 2023 12:56:00 -0500
Subject: [PATCH 08/61] Ensure consistency across quiescence and runState
decisions
---
.../java/util/concurrent/ForkJoinPool.java | 423 ++++++++++--------
.../java/util/concurrent/ForkJoinTask.java | 49 +-
2 files changed, 250 insertions(+), 222 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 7beff8f682d40..7a6a091ace52d 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -482,7 +482,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* available (and so is never checked in this way). When queued,
* the lower 16 bits of its phase must hold its pool index. So we
* place the index there upon initialization and never modify
- * these bits.
+ * these bits, which also allows use as a versioned id in other
+ * contexts.
*
* The ctl field also serves as the basis for memory
* synchronization surrounding activation. This uses a more
@@ -566,37 +567,40 @@ public class ForkJoinPool extends AbstractExecutorService {
* fields in ctl allow efficient and accurate discovery of
* quiescent states (i.e., when all workers are idle) after
* deactivation. However, this voting mechanism alone does not
- * guarantee that a pool can become dormant (quiesced or
- * terminated), because external racing producers do not vote, and
- * can asynchronously submit new tasks. To deal with this, the
- * final unparked thread (in awaitWork) scans external queues to
+ * guarantee that a pool can become dormant (or terminated),
+ * because external racing producers do not vote, and can
+ * asynchronously submit new tasks. To deal with this, the final
+ * unparked thread (in awaitWork) uses canStop (see below) to
* check for tasks that could have been added during a race window
* that would not be accompanied by a signal, in which case
- * re-activating itself (or any other worker) to recheck. The same
- * sets of checks are used in tryTerminate, to correctly trigger
- * delayed termination (shutDown, followed by quiescence) in the
- * presence of racing submissions. In all cases, the notion of the
- * "final" unparked thread is an approximation, because new
- * workers could be in the process of being constructed, which
- * occasionally adds some extra unnecessary processing.
- *
- * Shutdown and Termination. A call to shutdownNow invokes
- * tryTerminate to atomically set a mode bit. However, termination
- * is intrinsically non-atomic. The calling thread, as well as
- * other workers thereafter terminating, and external callers
+ * re-activating itself (or any other worker) to recheck.
+ *
+ * Termination. A call to shutdownNow invokes tryTerminate to
+ * atomically set a mode bit. However, termination is
+ * intrinsically non-atomic. The calling thread, as well as other
+ * workers thereafter terminating, and any external callers
* checking termination help cancel queued tasks, reactivate idle
- * workers, and interrupt others. To support this, worker
- * references not removed from the queues array during
- * termination. It is possible for late thread creations to still
- * be in progress after a quiescent termination reports terminated
- * status, but they will also immediately terminate. Calls to
- * non-abrupt shutdown() preface this by checking canStop before
- * triggering the "STOP" phase of termination. Like most eveything
- * else, we try to speed up termination by balancing parallelism
- * and contention, by starting task scans at different indices,
- * and by fanning out reactivation of idle workers. The main
- * remaining cost is the potential need to re-interrupt active
- * workers (checks for this are racy but suffice here).
+ * workers, and interrupt others; repeating until no more are
+ * found by checking and marking ctl counts and phase and access
+ * fields. These actions race with non-terminated workers.
+ * Except when using Interruptible tasks (see below), there are no
+ * quarantees after an abrupt shutdown whether remaining tasks
+ * complete normally or exceptionally or are cancelled, and
+ * termination may fail if tasks repeatedly ignore both
+ * cancellation status and interrupts.
+ *
+ * Quiescent shutdown. Calls to non-abrupt shutdown() use method
+ * canStop to trigger the "STOP" phase of termination upon
+ * quiescence, which is intrinsically racy, and requires
+ * convergent sweeps through queues to detect whether all workers
+ * are idle and there are no submissions that they could poll if
+ * they were not idle. If not immediately triggered, whenever all
+ * workers become idle in awaitWork, canStop is rechecked. Method
+ * helpQuiesce also uses canStop to help determine quiescence. It
+ * differs in that it does not trigger termination (even if
+ * enabled) and cannot rely on ctl counts to determine that all
+ * workers are inactive (because any executing helpQuiesce are not
+ * included).
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
@@ -736,27 +740,27 @@ public class ForkJoinPool extends AbstractExecutorService {
* LockSupport.park do not loop indefinitely (park returns
* immediately if the current thread is interrupted).
*
- * To conform to specs and expectations surrounding execution,
- * cancellation and termination of Callable-based ExecutorService
- * invoke, submit, invokeAll, and invokeAny methods (without
- * penalizing others) we use AdaptedInterruptibleCallables in
- * ExecutorService methods (actually, for InvokeAny, a variant
- * that gathers a single result). We also ensure that external
- * submitters do not help run such tasks by recording
- * ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag
- * argument to joining methods. External callers of task.get etc
- * are not directly interrupted on shutdown, but must be woken due
- * to the task itself being cancelled during termination.
- * AdaptedInterruptibleCallables tasks always clear interrupts and
- * check for termination and cancellation before invoking
- * user-supplied Callables. The rechecks are needed because the
- * cleared interrupt could be due to cancellation, termination, or
- * any other reason. These tasks also record their runner thread
- * so that cancel(true) can interrupt them later. As is the case
- * with any kind of pool, it is possible that an interrupt
- * designed to cancel one task occurs both late and unnecessarily,
- * so instead "spuriously" interrupts the thread while performing
- * a later task.
+ * However, to conform to specs and expectations surrounding
+ * execution, cancellation and termination of Callable-based
+ * ExecutorService invoke, submit, invokeAll, and invokeAny
+ * methods (without penalizing others) we use "Interruptible"
+ * tasks -- AdaptedInterruptibleCallables, and a variant
+ * InvokeAnyTask. We ensure that external submitters do not help
+ * run such tasks by recording ForkJoinTask.POOLSUBMIT in task
+ * status and/or as a bit flag argument to other methods. In
+ * addition to recording a "runner" field (similarly to
+ * FutureTask) to support cancel(true), we ensure that upon pool
+ * shutdown, either the task is cancelled by the runner or the
+ * runner is interrupted so it can cancel. Often both, but since
+ * external joining callers never run these tasks, they must await
+ * cancellation (or a result; reporting the correct result or
+ * exception for task.get etc depending on pool status). As is
+ * the case with any kind of pool, it is possible that an
+ * interrupt designed to cancel one task occurs both late and
+ * unnecessarily, so instead "spuriously" interrupts the worker
+ * thread while performing a later task. Users should check
+ * cancellation status upon interrupt, but usually cannot because
+ * their Callables are wrapped and so cannot access status.
*
* Memory placement
* ================
@@ -848,15 +852,10 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* The main sources of differences from previous version are:
*
- * * Use of Unsafe vs VarHandle, including re-instatement of some
- * constructions from pre-VarHandle versions.
- * * Reduced memory and signal contention, mainly by distinguishing
- * failure cases.
- * * Improved initialization, in part by preparing for possible
- * removal of SecurityManager
- * * Enable resizing (includes refactoring quiescence/termination)
- * * Unification of most internal vs external operations; some made
- * possible via use of WorkQueue.access, and POOLSUBMIT status in tasks.
+ * * Handling of Interruptible tasks consistent with
+ * ExecutorService specs, including uniform use of revised
+ * canStop for quiescence-related checks and corresponding
+ * revisions to termination checks
*/
// static configuration constants
@@ -1074,7 +1073,7 @@ static final class WorkQueue {
@jdk.internal.vm.annotation.Contended("w")
volatile int access; // values 0, 1 (locked), PARKED, STOP
@jdk.internal.vm.annotation.Contended("w")
- volatile int phase; // versioned, negative if inactive
+ volatile int phase; // versioned, negative if inactive, 0 on exit
@jdk.internal.vm.annotation.Contended("w")
volatile int source; // source queue id in topLevelExec
@jdk.internal.vm.annotation.Contended("w")
@@ -1623,6 +1622,7 @@ final void registerWorker(WorkQueue w) {
w.array = new ForkJoinTask>[INITIAL_QUEUE_CAPACITY];
cfg |= w.config | SRC;
w.stackPred = seed;
+ int seq = (seed ^ (seed >>> 16)) << 16; // initial phase seq
int id = (seed << 1) | 1; // initial index guess
lock.lock();
try {
@@ -1632,7 +1632,8 @@ final void registerWorker(WorkQueue w) {
for (; qs[id &= m] != null && k > 0; id -= 2, k -= 2);
if (k == 0)
id = n | 1; // resize below
- w.phase = w.config = id | cfg; // now publishable
+ w.config = id | cfg;
+ w.phase = (id | seq) & ~INACTIVE; // now publishable
if (id < n)
qs[id] = w;
@@ -1671,7 +1672,10 @@ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = (wt == null) ? null : wt.workQueue;
if (w != null) {
cfg = w.config;
- w.access = STOP; // may be redundant
+ w.access = STOP; // mark as done; cancel tasks
+ w.phase = 0;
+ for (ForkJoinTask> t; (t = w.nextLocalTask(0)) != null; )
+ ForkJoinTask.cancelIgnoringExceptions(t);
}
long c = ctl;
if ((cfg & TRIMMED) == 0) // decrement counts
@@ -1695,13 +1699,8 @@ else if ((int)c == 0) // was dropped on timeout
if ((cfg & SRC) != 0)
signalWork(); // possibly replace worker
}
- if (ex != null) {
- if (w != null) { // cancel tasks
- for (ForkJoinTask> t; (t = w.nextLocalTask(0)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
- }
+ if (ex != null)
ForkJoinTask.rethrow(ex);
- }
}
/*
@@ -1777,7 +1776,7 @@ private boolean tryTrim(WorkQueue w) {
int pred = w.stackPred, cfg = w.config | TRIMMED;
long c = ctl;
int sp = (int)c & ~INACTIVE;
- if ((sp & SMASK) == (cfg & SMASK) &&
+ if ((sp & SMASK) == (cfg & SMASK) && runState >= 0 &&
compareAndSetCtl(c, ((pred & SP_MASK) |
(UC_MASK & (c - TC_UNIT))))) {
w.config = cfg; // add sentinel for deregisterWorker
@@ -1789,27 +1788,51 @@ private boolean tryTrim(WorkQueue w) {
}
/**
- * Returns true if any queue is detectably nonempty. Accurate
- * only when workers are quiescent; else conservatively
- * approximate.
- * @param submissionsOnly if true, only check submission queues
- */
- private boolean hasTasks(boolean submissionsOnly) {
- int step = submissionsOnly ? 2 : 1;
- for (int checkSum = 0;;) { // repeat until stable (normally twice)
- U.loadFence();
- WorkQueue[] qs = queues;
- int n = (qs == null) ? 0 : qs.length, sum = 0;
- for (int i = 0; i < n; i += step) {
- WorkQueue q; int s;
+ * Internal version of isQuiescent and related functionality.
+ * Returns true if terminating or all submission queues are empty
+ * and unlocked, and unless !all, all workers are idle. To obtain
+ * a reliable snapshot, scans at least twice, and continues in
+ * case of inconsistencies.
+ *
+ * @param all if false, only check submission queues
+ * @param stopIfEnabled true if should transition runState to STOP
+ * if quiescent, in which case returns false
+ */
+ private boolean canStop(boolean all, boolean stopIfEnabled) {
+ long checkSum = 0L, sum = 0L; // for consistency checks
+ for (int scans = 2;;) { // min remaining scans
+ if (runState < 0)
+ return true; // terminating
+ else if (all && (ctl & RC_MASK) > 0L)
+ return false; // not idle
+ else if (scans > 0)
+ --scans; // checkSum not valid
+ else if (checkSum == sum) {
+ if (stopIfEnabled && (runState & SHUTDOWN) != 0) {
+ getAndBitwiseOrRunState(STOP);
+ return false; // caller cannot stop yet
+ }
+ return true;
+ }
+ checkSum = sum;
+ sum = 0L;
+ WorkQueue[] qs; WorkQueue q;
+ int n = ((qs = queues) == null) ? 0 : qs.length;
+ for (int i = 0; i < n; ++i) {
if ((q = qs[i]) != null) {
- if (q.access > 0 || (s = q.top) != q.base)
- return true;
- sum += (s << 16) + i + 1;
+ int x = q.access, p = q.phase, s = q.top, d = s - q.base;
+ if ((i & 1) != 0) {
+ if (all && (p | x) > 0)
+ return false; // active worker
+ }
+ else if (x != 0 || d > 0 || q.access != 0)
+ return false; // locked or nonempty
+ else if (d != 0 || q.top != s)
+ scans = 1; // inconsistent
+ else // hash to associate s with q
+ sum = (sum << 5) - sum + s + p;
}
}
- if (checkSum == (checkSum = sum))
- return false;
}
}
@@ -1878,6 +1901,7 @@ private int awaitWork(WorkQueue w) {
if (w == null)
return -1; // currently impossible
int p = (w.phase + SS_SEQ) & ~INACTIVE; // advance phase
+ boolean idle; // true if possibly quiescent
if (runState < 0)
return -1; // terminating
long sp = p & SP_MASK, pc = ctl, qc;
@@ -1886,19 +1910,17 @@ private int awaitWork(WorkQueue w) {
w.stackPred = (int)pc; // set ctl stack link
} while (pc != (pc = compareAndExchangeCtl(
pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
- boolean idle;
- if ((idle = ((qc & RC_MASK) <= 0L)) && // possibly quiescent
- (hasTasks(true) || (runState != 0 && canStop(true) < 0)) &&
- w.phase < 0)
- reactivate(); // enable rescan or termination
+ if ((idle = ((qc & RC_MASK) <= 0L)) && // check for stragglers
+ !canStop(true, true) && (w.phase >= 0 || reactivate() == w))
+ return 0; // rescan or quiescent terminate
WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) | 0xf;
while ((p = w.phase) < 0 && --spins > 0)
Thread.onSpinWait();
- if (p < 0) { // await signal
+ if (p < 0) {
long deadline = idle ? keepAlive + System.currentTimeMillis() : 0L;
LockSupport.setCurrentBlocker(this);
- for (;;) {
+ for (;;) { // await signal
w.access = PARKED; // enable unpark
if (w.phase < 0) {
if (idle)
@@ -1925,26 +1947,6 @@ private int awaitWork(WorkQueue w) {
return 0;
}
- /**
- * Non-overridable version of isQuiescent
- * @param enableTransition true if should set runState if can stop
- * @return negative if quiescent or already terminating, else runState
- */
- private int canStop(boolean enableTransition) {
- int rs;
- for (long c = ctl;;) {
- if ((rs = runState) < 0 || ((c & RC_MASK) > 0L || hasTasks(false)))
- break;
- if (c == (c = ctl)) { // validate
- if (enableTransition)
- getAndBitwiseOrRunState(STOP);
- rs |= STOP;
- break;
- }
- }
- return rs;
- }
-
/**
* Scans for and returns a polled task, if available. Used only
* for untracked polls. Begins scan at a random index to avoid
@@ -2190,18 +2192,21 @@ private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
if (w == null || (phase = w.phase) < 0)
return 0;
int activePhase = phase, inactivePhase = phase | INACTIVE;
- int wsrc = w.source, r = 0;
+ int wsrc = w.source, r = 0, returnStatus = 1;
for (boolean locals = true;;) {
WorkQueue[] qs; WorkQueue q;
- if (runState < 0) { // terminating
- w.phase = activePhase;
- return 1;
+ if (runState < 0)
+ break; // terminating
+ if (interruptible && Thread.interrupted()) {
+ returnStatus = -1;
+ break;
}
if (locals) { // run local tasks before (re)polling
+ locals = false;
for (ForkJoinTask> u; (u = w.nextLocalTask()) != null;)
u.doExec();
}
- boolean rescan = false, busy = locals = false, interrupted;
+ boolean rescan = false, busy = false;
int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
scan: for (int i = n, j; i > 0; --i, ++r) {
if ((q = qs[j = m & r]) != null && q != w) {
@@ -2210,17 +2215,17 @@ private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
int b = q.base, cap;
if (a == null || (cap = a.length) <= 0)
break;
- int k = (cap - 1) & b, nb = b + 1, nk = (cap - 1) & nb;
+ int k = (cap - 1) & b, nb = b + 1;
ForkJoinTask> t = a[k];
U.loadFence(); // for re-reads
if (q.base != b || q.array != a || a[k] != t)
;
else if (t == null) {
if (!rescan) {
- if (a[nk] != null || q.top - b > 0)
+ int p = q.phase, s = q.top;
+ if (s - b > 0)
rescan = true;
- else if (!busy &&
- q.owner != null && q.phase >= 0)
+ else if (p > 0)
busy = true;
}
break;
@@ -2232,37 +2237,40 @@ else if (WorkQueue.casSlotToNull(a, k, t)) {
w.source = src;
t.doExec();
w.source = wsrc;
- rescan = locals = true;
+ locals = true;
break scan;
}
}
}
}
- if (rescan)
- ; // retry
- else if (phase >= 0) {
+ if (locals)
+ ; // remain active
+ else if (rescan || !canStop(false, false)) {
+ if (phase < 0) // tentatively reactivate
+ w.phase = phase = activePhase;
+ }
+ else if (phase >= 0) { // tentatively inactivate
parkTime = 0L;
w.phase = phase = inactivePhase;
}
- else if (!busy) {
- w.phase = activePhase;
- return 1;
- }
+ else if (!busy)
+ break; // inactive and quiescent
else if (parkTime == 0L) {
- parkTime = 1L << 10; // initially about 1 usec
+ parkTime = 1L << 10; // initially about 1 usec
Thread.yield();
}
- else if ((interrupted = interruptible && Thread.interrupted()) ||
- System.nanoTime() - startTime > nanos) {
- w.phase = activePhase;
- return interrupted ? -1 : 0;
+ else if (System.nanoTime() - startTime > nanos) {
+ returnStatus = 0;
+ break;
}
else {
LockSupport.parkNanos(this, parkTime);
if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
- parkTime <<= 1; // max sleep approx 1 sec or 1% nanos
+ parkTime <<= 1; // max sleep approx 1 sec or 1% nanos
}
}
+ w.phase = activePhase;
+ return returnStatus;
}
/**
@@ -2279,7 +2287,7 @@ private int externalHelpQuiesce(long nanos, boolean interruptible) {
t.doExec();
parkTime = 0L;
}
- else if (canStop(false) < 0)
+ else if (canStop(true, false))
return 1;
else if (parkTime == 0L) {
parkTime = 1L << 10;
@@ -2351,6 +2359,7 @@ final WorkQueue submissionQueue(boolean isSubmit) {
break;
else if ((q = qs[i = (n - 1) & id]) == null) {
WorkQueue w = new WorkQueue(null, id | SRC);
+ w.phase = (id >>> 1) | INACTIVE; // for use as unique id
w.array = new ForkJoinTask>[INITIAL_QUEUE_CAPACITY];
lock.lock(); // install under lock
if (queues == qs && qs[i] == null)
@@ -2498,8 +2507,8 @@ static int getSurplusQueuedTaskCount() {
/**
* Possibly initiates and/or completes pool termination. If
* terminating, helps complete termination by cancelling tasks,
- * reactivating idle workers or interrupting active workers, and
- * possibly triggering TERMINATED state.
+ * reactivating idle workers or interrupting active workers, until
+ * no more are found, then possibly triggering TERMINATED state.
*
* @param now if true, unconditionally terminate, else only
* if no work and no active workers
@@ -2509,42 +2518,61 @@ static int getSurplusQueuedTaskCount() {
private int tryTerminate(boolean now, boolean enable) {
int rs = runState;
if ((config & ISCOMMON) == 0 && rs >= 0) {
- if (!now) {
+ if (now)
+ rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
+ else {
if (enable && (rs & SHUTDOWN) == 0)
rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
if ((rs & SHUTDOWN) != 0)
- rs = canStop(true);
+ canStop(true, true); // sets STOP if quiescent
+ rs = runState;
}
- else
- rs = getAndBitwiseOrRunState(SHUTDOWN | STOP) | STOP;
}
- if ((rs & (STOP | TERMINATED)) != STOP)
- return rs; // not terminating
- WorkQueue[] qs; WorkQueue q; Thread thread; WorkQueue w;
- ReentrantLock lock; Condition cond;
- boolean more = (reactivate() != null); // try activating idle worker
- Thread current = Thread.currentThread();
- int r = (((current instanceof ForkJoinWorkerThread) &&
- (w = ((ForkJoinWorkerThread)current).workQueue) != null) ?
- w.config : 0); // stagger traversals
- int n = ((qs = queues) == null) ? 0 : qs.length;
- for (int i = 0; i < n; ++i) {
- if ((q = qs[(r + i) & (n - 1)]) != null) {
- for (ForkJoinTask> t; (t = q.poll(null)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
- if (!more && q.access != STOP &&
- (thread = q.owner) != null && thread != current &&
- (q.forcePhaseActive() < 0 || !thread.isInterrupted())) {
- try {
- thread.interrupt(); // help unblock others
- } catch (Throwable ignore) {
+ if ((rs & (STOP | TERMINATED)) != STOP) // not terminating
+ return rs;
+ Thread current = Thread.currentThread(); // help terminate
+ WorkQueue w = ((current instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)current).workQueue : null);
+ int r = (w != null) ? w.config : 0; // stagger traversals
+ boolean rescan;
+ do { // repeat until cannot cancel, reactivate, or interrupt
+ boolean reactivated = rescan = (reactivate() != null);
+ WorkQueue[] qs = queues;
+ int n = (qs == null) ? 0 : qs.length;
+ for (int i = 0; i < n; ++i) {
+ WorkQueue q; Thread thread; int k, p;
+ if ((q = qs[k = (r + i) & (n - 1)]) != null && q.phase != 0 &&
+ q.access != STOP) {
+ for (ForkJoinTask> t; (t = q.poll(null)) != null; ) {
+ ForkJoinTask.cancelIgnoringExceptions(t);
+ rescan = true; // cancel tasks
+ }
+ if ((k & 1) == 0) { // is submission queue
+ if (q.getAndSetAccess(1) != 0)
+ rescan = true; // locked
+ else
+ q.access = q.phase = 0; // mark as done
+ }
+ else if (!reactivated) {
+ if ((p = q.phase) < 0) // reactivate next pass
+ rescan = true;
+ else if (p != 0 && (thread = q.owner) != null &&
+ !thread.isInterrupted()) {
+ rescan = true;
+ try {
+ thread.interrupt(); // try to unblock
+ } catch (Throwable ignore) {
+ }
+ }
}
}
}
- }
- if (reactivate() == null && // transition if no workers
- (short)(ctl >>> TC_SHIFT) <= 0 &&
- (runState & TERMINATED) == 0 &&
+ if (((rs = runState) & TERMINATED) != 0)
+ return rs;
+ } while (rescan);
+
+ ReentrantLock lock; Condition cond; // transition when no workers
+ if ((short)(ctl >>> TC_SHIFT) <= 0 &&
(getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
(lock = registrationLock) != null) {
lock.lock();
@@ -3138,34 +3166,33 @@ static final class InvokeAnyTask extends ForkJoinTask {
this.root = root;
}
public final boolean exec() {
- ForkJoinPool p;
+ InvokeAnyRoot r; ForkJoinPool p;
Thread t = Thread.currentThread();
- Thread.interrupted();
- runner = t;
- InvokeAnyRoot r = root;
- Callable c = callable;
- E v = null;
- Throwable ex = null;
- boolean completed = false;
- if (r != null && c != null) {
- if ((t instanceof ForkJoinWorkerThread) &&
+ if ((r = root) != null && !r.isDone()) {
+ if ((t instanceof ForkJoinWorkerThread) && // termination check
(p = ((ForkJoinWorkerThread) t).pool) != null &&
- p.runState < 0) { // termination check
+ p.runState < 0)
r.trySetCancelled();
- t.interrupt(); // restore interrupt
- }
- else if (!r.isDone() && !isDone()) {
- try {
- v = c.call();
- completed = true;
- } catch (Throwable rex) {
- ex = rex;
+ else {
+ Thread.interrupted();
+ runner = t;
+ E v = null;
+ Throwable ex = null;
+ boolean completed = false;
+ if (!isDone()) {
+ try {
+ v = callable.call();
+ completed = true;
+ } catch (Throwable rex) {
+ ex = rex;
+ }
}
+ runner = null;
+ if (trySetCancelled() >= 0) // avoid race with cancel
+ r.tryComplete(v, ex, completed);
+ Thread.interrupted();
}
- if (trySetCancelled() >= 0)
- r.tryComplete(v, ex, completed);
}
- runner = null;
return true;
}
public final boolean cancel(boolean mayInterruptIfRunning) {
@@ -3310,7 +3337,7 @@ public int getActiveThreadCount() {
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return canStop(false) < 0;
+ return canStop(true, false);
}
/**
@@ -3385,7 +3412,7 @@ public int getQueuedSubmissionCount() {
* @return {@code true} if there are any queued submissions
*/
public boolean hasQueuedSubmissions() {
- return hasTasks(true);
+ return !canStop(false, false);
}
/**
@@ -3587,9 +3614,8 @@ else if ((lock = registrationLock) != null) {
try {
if (cond == null && (cond = termination) == null)
termination = cond = lock.newCondition();
- if ((runState & TERMINATED) != 0)
- break;
- nanos = cond.awaitNanos(nanos);
+ if ((runState & TERMINATED) == 0)
+ nanos = cond.awaitNanos(nanos);
} finally {
lock.unlock();
}
@@ -3640,19 +3666,18 @@ public boolean awaitQuiescence(long timeout, TimeUnit unit) {
@Override
public void close() {
ReentrantLock lock;
- boolean interrupted = false;
if ((runState & TERMINATED) == 0 && (config & ISCOMMON) == 0 &&
(lock = registrationLock) != null) {
checkPermission();
Condition cond = null;
+ boolean interrupted = Thread.interrupted();
while ((tryTerminate(interrupted, true) & TERMINATED) == 0) {
lock.lock();
try {
if (cond == null && (cond = termination) == null)
termination = cond = lock.newCondition();
- if ((runState & TERMINATED) != 0)
- break;
- cond.await();
+ if ((runState & TERMINATED) == 0)
+ cond.await();
} catch (InterruptedException ex) {
interrupted = true;
} finally {
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index 421f8c402c6d5..4aa48bf10f877 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -425,6 +425,8 @@ else if ((how & RAN) != 0 ||
return s;
if (s == UNCOMPENSATE)
uncompensate = true;
+ if (p.runState < 0) // recheck below if interrupted
+ cancelIgnoringExceptions(this);
}
Aux node = null;
long ns = 0L;
@@ -446,13 +448,14 @@ else if (timed && (ns = deadline - System.nanoTime()) <= 0) {
}
else if (Thread.interrupted()) {
interrupted = true;
+ if (p != null && p.runState < 0)
+ cancelIgnoringExceptions(this);
if ((how & INTERRUPTIBLE) != 0) {
- s = ABNORMAL;
+ if ((s = status) >= 0)
+ s = ABNORMAL; // prefer reporting result to IE
break;
}
}
- else if ((s = status) < 0) // recheck
- break;
else if (timed)
LockSupport.parkNanos(ns);
else
@@ -480,11 +483,8 @@ else if (casAux(a, next))
}
}
}
- int stat = status; // prefer completion result
- if (stat < 0)
- s = stat;
}
- if (s < 0) {
+ else {
signalWaiters(); // help clean or signal
if (interrupted)
Thread.currentThread().interrupt();
@@ -1502,23 +1502,26 @@ static final class AdaptedInterruptibleCallable extends ForkJoinTask
public final boolean exec() {
ForkJoinPool p;
Thread t = Thread.currentThread();
- Thread.interrupted();
- runner = t;
- try {
- if ((t instanceof ForkJoinWorkerThread) &&
- (p = ((ForkJoinWorkerThread) t).pool) != null &&
- p.runState < 0) // termination check
- cancelIgnoringExceptions(this);
- else if (!isDone())
- result = callable.call();
- return true;
- } catch (RuntimeException rex) {
- throw rex;
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- } finally {
- runner = null;
+ if ((t instanceof ForkJoinWorkerThread) &&
+ (p = ((ForkJoinWorkerThread) t).pool) != null &&
+ p.runState < 0) // termination check
+ cancelIgnoringExceptions(this);
+ else {
+ Thread.interrupted();
+ runner = t;
+ try {
+ if (!isDone())
+ result = callable.call();
+ } catch (RuntimeException rex) {
+ throw rex;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ runner = null;
+ Thread.interrupted();
+ }
}
+ return true;
}
public final void run() { invoke(); }
public final boolean cancel(boolean mayInterruptIfRunning) {
From b01fe39ae12b567e2da8d3dcfa35bd9dd2427fed Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Sun, 22 Jan 2023 17:02:51 -0500
Subject: [PATCH 09/61] Check termination in ManagedBlockers
---
.../share/classes/java/util/concurrent/ForkJoinPool.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 7a6a091ace52d..a8612717d2df0 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1920,7 +1920,11 @@ private int awaitWork(WorkQueue w) {
if (p < 0) {
long deadline = idle ? keepAlive + System.currentTimeMillis() : 0L;
LockSupport.setCurrentBlocker(this);
- for (;;) { // await signal
+ for (;;) { // await signal or termination
+ if (runState < 0) { // activate before exit
+ do {} while (w.phase < 0 && reactivate() != null);
+ return -1;
+ }
w.access = PARKED; // enable unpark
if (w.phase < 0) {
if (idle)
@@ -3810,6 +3814,8 @@ private void compensatedBlock(ManagedBlocker blocker)
long c = ctl;
if (blocker.isReleasable())
break;
+ if (runState < 0) // will be interrupted on cancellation
+ throw new InterruptedException();
if ((comp = tryCompensate(c, false)) >= 0) {
long post = (comp == 0) ? 0L : RC_UNIT;
try {
From 212dc07e78ba007f27bac0db0f6db27d7cf5a9e1 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Sat, 28 Jan 2023 16:28:01 -0500
Subject: [PATCH 10/61] More uniform handling of Interruptibles
---
.../java/util/concurrent/ForkJoinPool.java | 498 ++++++++----------
.../java/util/concurrent/ForkJoinTask.java | 158 ++++--
2 files changed, 336 insertions(+), 320 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index a8612717d2df0..70ce1c16fbcb1 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -579,28 +579,30 @@ public class ForkJoinPool extends AbstractExecutorService {
* atomically set a mode bit. However, termination is
* intrinsically non-atomic. The calling thread, as well as other
* workers thereafter terminating, and any external callers
- * checking termination help cancel queued tasks, reactivate idle
- * workers, and interrupt others; repeating until no more are
- * found by checking and marking ctl counts and phase and access
- * fields. These actions race with non-terminated workers.
- * Except when using Interruptible tasks (see below), there are no
- * quarantees after an abrupt shutdown whether remaining tasks
- * complete normally or exceptionally or are cancelled, and
- * termination may fail if tasks repeatedly ignore both
- * cancellation status and interrupts.
+ * checking termination help cancel queued tasks and unblock
+ * workers, repeating until no more are found. These actions race
+ * with non-terminated workers. Except when using Interruptible
+ * tasks (see below), there are no quarantees after an abrupt
+ * shutdown whether remaining tasks complete normally or
+ * exceptionally or are cancelled, and termination may fail if
+ * tasks repeatedly ignore both cancellation status and
+ * interrupts.
*
* Quiescent shutdown. Calls to non-abrupt shutdown() use method
* canStop to trigger the "STOP" phase of termination upon
- * quiescence, which is intrinsically racy, and requires
- * convergent sweeps through queues to detect whether all workers
- * are idle and there are no submissions that they could poll if
- * they were not idle. If not immediately triggered, whenever all
- * workers become idle in awaitWork, canStop is rechecked. Method
- * helpQuiesce also uses canStop to help determine quiescence. It
- * differs in that it does not trigger termination (even if
- * enabled) and cannot rely on ctl counts to determine that all
- * workers are inactive (because any executing helpQuiesce are not
- * included).
+ * quiescence, which is intrinsically racy, and requires multiple
+ * scans of queues to detect whether all workers are idle and
+ * there are no submissions that they could poll if they were not
+ * idle. If two scans agree on state, then there was at least one
+ * point at which quiescence held, and so further submissions may
+ * be rejected. To avoid false-positives, we use a linear hash.
+ * (A similar scheme is used in tryTerminate.) If not immediately
+ * triggered, whenever all workers become idle in awaitWork,
+ * canStop is rechecked. Method helpQuiesce also uses canStop to
+ * help determine quiescence. It differs in that it does not
+ * trigger termination (even if enabled) and cannot rely on ctl
+ * counts to determine that all workers are inactive (because any
+ * executing helpQuiesce are not included).
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
@@ -740,27 +742,23 @@ public class ForkJoinPool extends AbstractExecutorService {
* LockSupport.park do not loop indefinitely (park returns
* immediately if the current thread is interrupted).
*
- * However, to conform to specs and expectations surrounding
- * execution, cancellation and termination of Callable-based
- * ExecutorService invoke, submit, invokeAll, and invokeAny
- * methods (without penalizing others) we use "Interruptible"
- * tasks -- AdaptedInterruptibleCallables, and a variant
- * InvokeAnyTask. We ensure that external submitters do not help
- * run such tasks by recording ForkJoinTask.POOLSUBMIT in task
- * status and/or as a bit flag argument to other methods. In
- * addition to recording a "runner" field (similarly to
- * FutureTask) to support cancel(true), we ensure that upon pool
- * shutdown, either the task is cancelled by the runner or the
- * runner is interrupted so it can cancel. Often both, but since
- * external joining callers never run these tasks, they must await
- * cancellation (or a result; reporting the correct result or
- * exception for task.get etc depending on pool status). As is
- * the case with any kind of pool, it is possible that an
+ * Interruptible tasks. To conform to ExecutorService specs and
+ * expectations externally invoked ExecutorService methods use
+ * InterruptibleForkJoinTasks. External submitters do not help
+ * run such tasks (implemented by marking with
+ * ForkJoinTask.POOLSUBMIT in task status and/or as a bit flag
+ * argument to other methods). These tasks include a "runner"
+ * field (similarly to FutureTask) to support cancel(true),
+ * minimizing impact of "stray" interrupts and those in which an
* interrupt designed to cancel one task occurs both late and
- * unnecessarily, so instead "spuriously" interrupts the worker
- * thread while performing a later task. Users should check
- * cancellation status upon interrupt, but usually cannot because
- * their Callables are wrapped and so cannot access status.
+ * unnecessarily. Upon pool shutdown, the task is cancelled, and
+ * runners are interrupted so they can cancel. Since external
+ * joining callers never run these tasks, they must await
+ * cancellation.
+ *
+ * See the ForkJoinTask internal documentation for an account of
+ * how these correspond to how and when different exceptions are
+ * thrown.
*
* Memory placement
* ================
@@ -852,7 +850,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* The main sources of differences from previous version are:
*
- * * Handling of Interruptible tasks consistent with
+ * * Handling of Interruptible tasks is now consistent with
* ExecutorService specs, including uniform use of revised
* canStop for quiescence-related checks and corresponding
* revisions to termination checks
@@ -1073,7 +1071,7 @@ static final class WorkQueue {
@jdk.internal.vm.annotation.Contended("w")
volatile int access; // values 0, 1 (locked), PARKED, STOP
@jdk.internal.vm.annotation.Contended("w")
- volatile int phase; // versioned, negative if inactive, 0 on exit
+ volatile int phase; // versioned, negative if inactive
@jdk.internal.vm.annotation.Contended("w")
volatile int source; // source queue id in topLevelExec
@jdk.internal.vm.annotation.Contended("w")
@@ -1095,9 +1093,6 @@ static boolean casSlotToNull(ForkJoinTask>[] a, int i,
return U.compareAndSetReference(a, ((long)i << ASHIFT) + ABASE,
c, null);
}
- final int forcePhaseActive() { // clear sign bit
- return U.getAndBitwiseAndInt(this, PHASE, 0x7fffffff);
- }
final int getAndSetAccess(int v) {
return U.getAndSetInt(this, ACCESS, v);
}
@@ -1279,7 +1274,7 @@ else if (t != null && casSlotToNull(a, k, t)) {
}
else if (array != a || a[k] != null)
; // stale
- else if (a[nk] == null && top - b <= 0)
+ else if (a[nk] == null && access <= 0 && top - b <= 0)
break; // empty
}
return null;
@@ -1672,10 +1667,9 @@ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = (wt == null) ? null : wt.workQueue;
if (w != null) {
cfg = w.config;
- w.access = STOP; // mark as done; cancel tasks
- w.phase = 0;
- for (ForkJoinTask> t; (t = w.nextLocalTask(0)) != null; )
- ForkJoinTask.cancelIgnoringExceptions(t);
+ w.access = STOP; // mark as done
+ do {} while (w.phase < 0 && // ensure released
+ reactivate() != null);
}
long c = ctl;
if ((cfg & TRIMMED) == 0) // decrement counts
@@ -1696,6 +1690,8 @@ else if ((int)c == 0) // was dropped on timeout
stealCount += ns; // accumulate steals
lock.unlock();
}
+ for (ForkJoinTask> t; (t = w.nextLocalTask(0)) != null; )
+ ForkJoinTask.cancelIgnoringExceptions(t);
if ((cfg & SRC) != 0)
signalWork(); // possibly replace worker
}
@@ -1787,55 +1783,6 @@ private boolean tryTrim(WorkQueue w) {
return false;
}
- /**
- * Internal version of isQuiescent and related functionality.
- * Returns true if terminating or all submission queues are empty
- * and unlocked, and unless !all, all workers are idle. To obtain
- * a reliable snapshot, scans at least twice, and continues in
- * case of inconsistencies.
- *
- * @param all if false, only check submission queues
- * @param stopIfEnabled true if should transition runState to STOP
- * if quiescent, in which case returns false
- */
- private boolean canStop(boolean all, boolean stopIfEnabled) {
- long checkSum = 0L, sum = 0L; // for consistency checks
- for (int scans = 2;;) { // min remaining scans
- if (runState < 0)
- return true; // terminating
- else if (all && (ctl & RC_MASK) > 0L)
- return false; // not idle
- else if (scans > 0)
- --scans; // checkSum not valid
- else if (checkSum == sum) {
- if (stopIfEnabled && (runState & SHUTDOWN) != 0) {
- getAndBitwiseOrRunState(STOP);
- return false; // caller cannot stop yet
- }
- return true;
- }
- checkSum = sum;
- sum = 0L;
- WorkQueue[] qs; WorkQueue q;
- int n = ((qs = queues) == null) ? 0 : qs.length;
- for (int i = 0; i < n; ++i) {
- if ((q = qs[i]) != null) {
- int x = q.access, p = q.phase, s = q.top, d = s - q.base;
- if ((i & 1) != 0) {
- if (all && (p | x) > 0)
- return false; // active worker
- }
- else if (x != 0 || d > 0 || q.access != 0)
- return false; // locked or nonempty
- else if (d != 0 || q.top != s)
- scans = 1; // inconsistent
- else // hash to associate s with q
- sum = (sum << 5) - sum + s + p;
- }
- }
- }
- }
-
/**
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
* See above for explanation.
@@ -1912,7 +1859,7 @@ private int awaitWork(WorkQueue w) {
pc, qc = ((pc - RC_UNIT) & UC_MASK) | sp)));
if ((idle = ((qc & RC_MASK) <= 0L)) && // check for stragglers
!canStop(true, true) && (w.phase >= 0 || reactivate() == w))
- return 0; // rescan or quiescent terminate
+ return 0; // rescan or terminate
WorkQueue[] qs = queues; // spin for expected #accesses in scan+signal
int spins = ((qs == null) ? 0 : ((qs.length & SMASK) << 1)) | 0xf;
while ((p = w.phase) < 0 && --spins > 0)
@@ -1921,10 +1868,8 @@ private int awaitWork(WorkQueue w) {
long deadline = idle ? keepAlive + System.currentTimeMillis() : 0L;
LockSupport.setCurrentBlocker(this);
for (;;) { // await signal or termination
- if (runState < 0) { // activate before exit
- do {} while (w.phase < 0 && reactivate() != null);
- return -1;
- }
+ if (runState < 0)
+ break;
w.access = PARKED; // enable unpark
if (w.phase < 0) {
if (idle)
@@ -1951,6 +1896,49 @@ private int awaitWork(WorkQueue w) {
return 0;
}
+ /**
+ * Internal version of isQuiescent and related functionality.
+ * Returns true if terminating or all submission queues are empty
+ * and unlocked, and all workers are idle.
+ *
+ * @param idle false if caller need not be counted as idle.
+ * @param stopIfEnabled true if should transition runState to STOP
+ * if quiescent, in which case returns false
+ */
+ private boolean canStop(boolean idle, boolean stopIfEnabled) {
+ long prevSum = 0L; // at least 2 scans
+ for (boolean stable = false, rescan = true;;) {
+ if (runState < 0)
+ return true; // terminating
+ else if (idle && (ctl & RC_MASK) > 0L)
+ return false; // active workers exist
+ else if (stable) {
+ if (stopIfEnabled && (runState & SHUTDOWN) != 0) {
+ getAndBitwiseOrRunState(STOP);
+ return false; // caller cannot stop yet
+ }
+ return true;
+ }
+ long sum = 0L;
+ WorkQueue[] qs; WorkQueue q;
+ int n = ((qs = queues) == null) ? 0 : qs.length;
+ for (int i = 0; i < n; ++i) {
+ if ((q = qs[i]) != null) {
+ int x = q.access, p = q.phase, s = q.top, d = s - q.base;
+ if (x > 0 || p > 0 || d > 0 || q.access > 0)
+ return false; // locked, active or nonempty
+ if (d != 0 || q.top != s)
+ rescan = true; // inconsistent reads
+ sum = (sum << 5) - sum + s + p; // sequence hash
+ }
+ }
+ if (!rescan && sum == prevSum)
+ stable = true;
+ prevSum = sum;
+ rescan = false;
+ }
+ }
+
/**
* Scans for and returns a polled task, if available. Used only
* for untracked polls. Begins scan at a random index to avoid
@@ -2191,12 +2179,13 @@ else if ((f = f.completer) == null)
* @return positive if quiescent, negative if interrupted, else 0
*/
private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
- long startTime = System.nanoTime(), parkTime = 0L;
+ long startTime = System.nanoTime();
int phase; // w.phase set negative when temporarily quiescent
if (w == null || (phase = w.phase) < 0)
return 0;
int activePhase = phase, inactivePhase = phase | INACTIVE;
int wsrc = w.source, r = 0, returnStatus = 1;
+ long initialSleep = -parallelism, parkTime = 0L;
for (boolean locals = true;;) {
WorkQueue[] qs; WorkQueue q;
if (runState < 0)
@@ -2210,7 +2199,7 @@ private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
for (ForkJoinTask> u; (u = w.nextLocalTask()) != null;)
u.doExec();
}
- boolean rescan = false, busy = false;
+ boolean rescan = false;
int n = ((qs = queues) == null) ? 0 : qs.length, m = n - 1;
scan: for (int i = n, j; i > 0; --i, ++r) {
if ((q = qs[j = m & r]) != null && q != w) {
@@ -2225,13 +2214,8 @@ private int helpQuiesce(WorkQueue w, long nanos, boolean interruptible) {
if (q.base != b || q.array != a || a[k] != t)
;
else if (t == null) {
- if (!rescan) {
- int p = q.phase, s = q.top;
- if (s - b > 0)
- rescan = true;
- else if (p > 0)
- busy = true;
- }
+ if (!rescan && (q.access > 0 || q.top - b > 0))
+ rescan = true;
break;
}
else if (phase < 0) // reactivate before taking
@@ -2241,36 +2225,31 @@ else if (WorkQueue.casSlotToNull(a, k, t)) {
w.source = src;
t.doExec();
w.source = wsrc;
- locals = true;
+ rescan = locals = true;
break scan;
}
}
}
}
- if (locals)
- ; // remain active
- else if (rescan || !canStop(false, false)) {
- if (phase < 0) // tentatively reactivate
- w.phase = phase = activePhase;
- }
- else if (phase >= 0) { // tentatively inactivate
- parkTime = 0L;
- w.phase = phase = inactivePhase;
- }
- else if (!busy)
- break; // inactive and quiescent
- else if (parkTime == 0L) {
- parkTime = 1L << 10; // initially about 1 usec
- Thread.yield();
- }
- else if (System.nanoTime() - startTime > nanos) {
- returnStatus = 0;
- break;
- }
- else {
- LockSupport.parkNanos(this, parkTime);
- if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
- parkTime <<= 1; // max sleep approx 1 sec or 1% nanos
+ if (!rescan) {
+ if (phase >= 0) { // tentatively inactivate
+ w.phase = phase = inactivePhase;
+ parkTime = initialSleep;
+ }
+ if (canStop(false, false))
+ break; // quiescent
+ else if (System.nanoTime() - startTime > nanos) {
+ returnStatus = 0; // timed out
+ break;
+ }
+ else if (parkTime <= 0L) // spin before sleep
+ ++parkTime;
+ else { // initial sleep approx 1 usec
+ parkTime = Math.max(parkTime, 1L << 10);
+ LockSupport.parkNanos(this, parkTime);
+ if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
+ parkTime <<= 1; // max sleep approx 1 sec or 1% nanos
+ }
}
}
w.phase = activePhase;
@@ -2285,23 +2264,26 @@ else if (System.nanoTime() - startTime > nanos) {
* @return positive if quiescent, negative if interrupted, else 0
*/
private int externalHelpQuiesce(long nanos, boolean interruptible) {
- for (long startTime = System.nanoTime(), parkTime = 0L;;) {
+ long startTime = System.nanoTime();
+ long initialSleep = -parallelism, parkTime = 0L;
+ for (;;) { // same structure as helpQuiesce
ForkJoinTask> t;
- if ((t = pollScan(false)) != null) {
+ if (runState < 0)
+ return 1;
+ else if (interruptible && Thread.interrupted())
+ return -1;
+ else if ((t = pollScan(false)) != null) {
+ parkTime = initialSleep;
t.doExec();
- parkTime = 0L;
}
else if (canStop(true, false))
return 1;
- else if (parkTime == 0L) {
- parkTime = 1L << 10;
- Thread.yield();
- }
- else if ((System.nanoTime() - startTime) > nanos)
+ else if (System.nanoTime() - startTime > nanos)
return 0;
- else if (interruptible && Thread.interrupted())
- return -1;
+ else if (parkTime <= 0L)
+ ++parkTime;
else {
+ parkTime = Math.max(parkTime, 1L << 10);
LockSupport.parkNanos(this, parkTime);
if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
parkTime <<= 1;
@@ -2509,10 +2491,7 @@ static int getSurplusQueuedTaskCount() {
// Termination
/**
- * Possibly initiates and/or completes pool termination. If
- * terminating, helps complete termination by cancelling tasks,
- * reactivating idle workers or interrupting active workers, until
- * no more are found, then possibly triggering TERMINATED state.
+ * Possibly initiates and/or completes pool termination.
*
* @param now if true, unconditionally terminate, else only
* if no work and no active workers
@@ -2527,65 +2506,58 @@ private int tryTerminate(boolean now, boolean enable) {
else {
if (enable && (rs & SHUTDOWN) == 0)
rs = getAndBitwiseOrRunState(SHUTDOWN) | SHUTDOWN;
- if ((rs & SHUTDOWN) != 0)
- canStop(true, true); // sets STOP if quiescent
- rs = runState;
+ if ((rs & SHUTDOWN) != 0) {
+ canStop(true, true); // sets STOP if quiescent
+ rs = runState;
+ }
}
}
- if ((rs & (STOP | TERMINATED)) != STOP) // not terminating
+ if ((rs & (STOP | TERMINATED)) != STOP) // else help terminate
return rs;
- Thread current = Thread.currentThread(); // help terminate
+ reactivate(); // try to fan out
+ Thread current = Thread.currentThread();
WorkQueue w = ((current instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)current).workQueue : null);
- int r = (w != null) ? w.config : 0; // stagger traversals
- boolean rescan;
- do { // repeat until cannot cancel, reactivate, or interrupt
- boolean reactivated = rescan = (reactivate() != null);
+ int r = (w != null) ? w.config : 0; // stagger traversals
+ // loop until all queues empty and all workers stopped or interrupted
+ long prevSum = 0L; // similar to canStop
+ for (boolean stable = false, rescan = true;;) {
+ if (((rs = runState) & TERMINATED) != 0)
+ return rs;
+ if (stable) {
+ ReentrantLock lock; Condition cond; // transition if no workers
+ if ((short)(ctl >>> TC_SHIFT) <= 0 &&
+ (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
+ (lock = registrationLock) != null) {
+ lock.lock();
+ if ((cond = termination) != null)
+ cond.signalAll();
+ lock.unlock();
+ container.close();
+ }
+ return runState;
+ }
+ long sum = 0L;
WorkQueue[] qs = queues;
int n = (qs == null) ? 0 : qs.length;
for (int i = 0; i < n; ++i) {
- WorkQueue q; Thread thread; int k, p;
- if ((q = qs[k = (r + i) & (n - 1)]) != null && q.phase != 0 &&
- q.access != STOP) {
- for (ForkJoinTask> t; (t = q.poll(null)) != null; ) {
- ForkJoinTask.cancelIgnoringExceptions(t);
- rescan = true; // cancel tasks
- }
- if ((k & 1) == 0) { // is submission queue
- if (q.getAndSetAccess(1) != 0)
- rescan = true; // locked
- else
- q.access = q.phase = 0; // mark as done
- }
- else if (!reactivated) {
- if ((p = q.phase) < 0) // reactivate next pass
- rescan = true;
- else if (p != 0 && (thread = q.owner) != null &&
- !thread.isInterrupted()) {
- rescan = true;
- try {
- thread.interrupt(); // try to unblock
- } catch (Throwable ignore) {
- }
- }
+ WorkQueue q; Thread qt;
+ if ((q = qs[(r + i) & (n - 1)]) != null) {
+ for (ForkJoinTask> f; (f = q.poll(null)) != null; )
+ ForkJoinTask.cancelIgnoringExceptions(f);
+ if ((qt = q.owner) != null && q.access != STOP &&
+ !qt.isInterrupted()) {
+ rescan = true;
+ ForkJoinTask.interruptIgnoringExceptions(qt);
}
+ sum = (sum << 5) - sum + (q.phase & ~INACTIVE);
}
}
- if (((rs = runState) & TERMINATED) != 0)
- return rs;
- } while (rescan);
-
- ReentrantLock lock; Condition cond; // transition when no workers
- if ((short)(ctl >>> TC_SHIFT) <= 0 &&
- (getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0 &&
- (lock = registrationLock) != null) {
- lock.lock();
- if ((cond = termination) != null)
- cond.signalAll();
- lock.unlock();
- container.close();
+ if (!rescan && sum == prevSum)
+ stable = true;
+ prevSum = sum;
+ rescan = false;
}
- return runState;
}
// Exported methods
@@ -2935,7 +2907,7 @@ public ForkJoinTask submit(Callable task) {
*/
@Override
public ForkJoinTask submit(Runnable task, T result) {
- return poolSubmit(true, new ForkJoinTask.AdaptedRunnable(task, result));
+ return poolSubmit(true, new ForkJoinTask.AdaptedInterruptibleRunnable(task, result));
}
/**
@@ -2948,7 +2920,7 @@ public ForkJoinTask submit(Runnable task, T result) {
public ForkJoinTask> submit(Runnable task) {
return poolSubmit(true, (task instanceof ForkJoinTask>)
? (ForkJoinTask) task // avoid re-wrap
- : new ForkJoinTask.AdaptedRunnableAction(task));
+ : new ForkJoinTask.AdaptedInterruptibleRunnable(task, null));
}
// Added mainly for possible use in Loom
@@ -3039,17 +3011,21 @@ public List> invokeAll(Collection extends Callable> tasks) {
ArrayList> futures = new ArrayList<>(tasks.size());
try {
for (Callable t : tasks) {
- ForkJoinTask f =
+ ForkJoinTask.AdaptedInterruptibleCallable f =
new ForkJoinTask.AdaptedInterruptibleCallable(t);
futures.add(f);
poolSubmit(true, f);
}
- for (int i = futures.size() - 1; i >= 0; --i)
- ((ForkJoinTask>)futures.get(i)).quietlyJoin();
+ for (int i = futures.size() - 1; i >= 0; --i) {
+ ForkJoinTask.AdaptedInterruptibleCallable f =
+ (ForkJoinTask.AdaptedInterruptibleCallable)futures.get(i);
+ f.quietlyJoin();
+ }
return futures;
} catch (Throwable t) {
for (Future e : futures)
- ForkJoinTask.cancelIgnoringExceptions(e);
+ ForkJoinTask.cancelIgnoringExceptions(
+ (ForkJoinTask.AdaptedInterruptibleCallable)e);
throw t;
}
}
@@ -3062,7 +3038,7 @@ public List> invokeAll(Collection extends Callable> tasks,
ArrayList> futures = new ArrayList<>(tasks.size());
try {
for (Callable t : tasks) {
- ForkJoinTask f =
+ ForkJoinTask.AdaptedInterruptibleCallable f =
new ForkJoinTask.AdaptedInterruptibleCallable(t);
futures.add(f);
poolSubmit(true, f);
@@ -3070,7 +3046,8 @@ public List> invokeAll(Collection extends Callable> tasks,
long startTime = System.nanoTime(), ns = nanos;
boolean timedOut = (ns < 0L);
for (int i = futures.size() - 1; i >= 0; --i) {
- ForkJoinTask f = (ForkJoinTask)futures.get(i);
+ ForkJoinTask.AdaptedInterruptibleCallable f =
+ (ForkJoinTask.AdaptedInterruptibleCallable)futures.get(i);
if (!f.isDone()) {
if (!timedOut)
timedOut = !f.quietlyJoin(ns, TimeUnit.NANOSECONDS);
@@ -3083,18 +3060,18 @@ public List> invokeAll(Collection extends Callable> tasks,
return futures;
} catch (Throwable t) {
for (Future e : futures)
- ForkJoinTask.cancelIgnoringExceptions(e);
+ ForkJoinTask.cancelIgnoringExceptions(
+ (ForkJoinTask.AdaptedInterruptibleCallable)e);
throw t;
}
}
/**
- * Task to hold results for invokeAny, or to report exception if
- * all subtasks fail or are cancelled or the pool is terminating.
- * Note: Among other oddities, the ExecutorService.invokeAny spec
- * requires that the all-cancelled case (including cancellation of
- * the root because of pool termination) be reported as an
- * ExecutionException, not a CancellationException.
+ * Task (that is never forked) to hold results for invokeAny, or
+ * to report exception if all subtasks fail or are cancelled or
+ * the pool is terminating. ForkJoinTask tags are used to avoid
+ * multiple calls to tryComplete by the same task under async
+ * cancellation.
*/
static final class InvokeAnyRoot extends ForkJoinTask {
private static final long serialVersionUID = 2838392045355241008L;
@@ -3107,13 +3084,17 @@ static final class InvokeAnyRoot extends ForkJoinTask {
throw new IllegalArgumentException();
count = new AtomicInteger(n);
}
- final void tryComplete(E v, Throwable ex, boolean completed) {
- if (!isDone()) {
+ final void tryComplete(InvokeAnyTask f, E v, Throwable ex,
+ boolean completed) {
+ ForkJoinPool p;
+ if (!isDone() && f != null &&
+ f.compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (completed) {
result = v;
quietlyComplete();
}
- else if (count.getAndDecrement() <= 1) {
+ else if (count.getAndDecrement() <= 1 ||
+ ((p = getPool()) != null && p.runState < 0)) {
if (ex == null)
trySetCancelled();
else
@@ -3153,69 +3134,37 @@ final E invokeAny(Collection extends Callable> tasks,
}
/**
- * Variant of AdaptedInterruptibleCallable with results in
- * InvokeAnyRoot (and never independently joined). Cancellation
- * status is used to avoid multiple calls to tryComplete by the
- * same task under async cancellation.
+ * AdaptedInterruptibleCallable with results in InvokeAnyRoot (and
+ * never independently joined).
*/
- static final class InvokeAnyTask extends ForkJoinTask {
+ static final class InvokeAnyTask extends ForkJoinTask.AdaptedInterruptibleCallable {
private static final long serialVersionUID = 2838392045355241008L;
final InvokeAnyRoot root;
- @SuppressWarnings("serial") // Conditionally serializable
- final Callable callable;
- transient volatile Thread runner;
InvokeAnyTask(InvokeAnyRoot root, Callable callable) {
- if (callable == null) throw new NullPointerException();
- this.callable = callable;
+ super(callable);
this.root = root;
}
- public final boolean exec() {
- InvokeAnyRoot r; ForkJoinPool p;
- Thread t = Thread.currentThread();
+ final E compute() throws Exception {
+ InvokeAnyRoot r;
if ((r = root) != null && !r.isDone()) {
- if ((t instanceof ForkJoinWorkerThread) && // termination check
- (p = ((ForkJoinWorkerThread) t).pool) != null &&
- p.runState < 0)
- r.trySetCancelled();
- else {
- Thread.interrupted();
- runner = t;
- E v = null;
- Throwable ex = null;
- boolean completed = false;
- if (!isDone()) {
- try {
- v = callable.call();
- completed = true;
- } catch (Throwable rex) {
- ex = rex;
- }
- }
- runner = null;
- if (trySetCancelled() >= 0) // avoid race with cancel
- r.tryComplete(v, ex, completed);
- Thread.interrupted();
- }
- }
- return true;
- }
- public final boolean cancel(boolean mayInterruptIfRunning) {
- int s; Thread t; InvokeAnyRoot r;
- if ((s = trySetCancelled()) >= 0) {
- if ((r = root) != null)
- r.tryComplete(null, null, false);
- if (mayInterruptIfRunning && (t = runner) != null) {
- try {
- t.interrupt();
- } catch (Throwable ignore) {
- }
+ E v = null; Throwable ex = null; boolean completed = false;
+ try {
+ v = callable.call();
+ completed = true;
+ } catch (Throwable rex) {
+ ex = rex;
+ } finally {
+ r.tryComplete(this, v, ex, completed);
}
- return true;
}
- return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
+ return null;
+ }
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ InvokeAnyRoot r = root;
+ if (r != null)
+ r.tryComplete(this, null, null, false);
+ return super.cancel(mayInterruptIfRunning);
}
- public final void setRawResult(E v) {} // unused
- public final E getRawResult() { return null; }
}
@Override
@@ -3416,7 +3365,14 @@ public int getQueuedSubmissionCount() {
* @return {@code true} if there are any queued submissions
*/
public boolean hasQueuedSubmissions() {
- return !canStop(false, false);
+ WorkQueue[] qs; WorkQueue q;
+ if (runState >= 0 && (qs = queues) != null) {
+ for (int i = 0; i < qs.length; i += 2) {
+ if ((q = qs[i]) != null && q.queueSize() > 0)
+ return true;
+ }
+ }
+ return false;
}
/**
@@ -3868,7 +3824,7 @@ private static void unmanagedBlock(ManagedBlocker blocker)
@Override
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
- return new ForkJoinTask.AdaptedRunnable(runnable, value);
+ return new ForkJoinTask.AdaptedInterruptibleRunnable(runnable, value);
}
@Override
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index 4aa48bf10f877..aff45d931a4e5 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -208,7 +208,36 @@ public abstract class ForkJoinTask implements Future, Serializable {
* See the internal documentation of class ForkJoinPool for a
* general implementation overview. ForkJoinTasks are mainly
* responsible for maintaining their "status" field amidst relays
- * to methods in ForkJoinWorkerThread and ForkJoinPool.
+ * to methods in ForkJoinWorkerThread and ForkJoinPool, along with
+ * recording and reporting exceptions. The status field mainly
+ * holds bits recording completion status. There is no bit
+ * representing "running", to recording whether incomplete tasks
+ * are queued vs executing (although these cases can be
+ * distinguished in InterruptibleForkJoinTasks). Cancellation is
+ * recorded in status bits (ABNORMAL but not THROWN), but reported
+ * in joining methods by throwing an exception. Other exceptions
+ * of completed (THROWN) tasks are recorded in the "aux" field,
+ * but may be reconstructed (in getThrowableException) to produce
+ * more useful stack traces when reported. Sentinels for being
+ * interrupted or timing out while waiting for completion are not
+ * recorded as status bits but are included in return values of
+ * methods in which they occur.
+ *
+ * The rules for how these are managed and issued in public
+ * methods have evolved across versions of this class. Those for
+ * tasks with results accessed via join() differ from those via
+ * Future.get(), which may also vary when invoked using pool
+ * submit methods by non-workers. In particular, internal usages
+ * of ForkJoinTasks ignore interrupt status when executing or
+ * awaiting completion. In all other cases, reporting task status
+ * (results or exceptions while executing) is always preferred to
+ * interrupts (also timeouts), restoring caller interrupt status
+ * in case of races. Similarly, completion status is preferred to
+ * reporting cancellation (ensured by atomically setting status),
+ * including cases where tasks are cancelled during termination.
+ * Cancellation is reported as an unchecked exception by join(),
+ * and by worker calls to get(), but is otherwise wrapped by a
+ * (checked) ExecutionException.
*
* The methods of this class are more-or-less layered into
* (1) basic status maintenance
@@ -216,12 +245,6 @@ public abstract class ForkJoinTask implements Future