Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,10 @@ public int parallelism() {
public long now() {
return System.currentTimeMillis();
}

/**
* Shuts down the threads associated with this scheduler.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have some javadoc on this method about if this method is synchronous or asynchronous? If it's not synchronous then how will I know when the Scheduler has shutdown, that is when the running threads currently associated with the Scheduler have completed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not detailed in the requirements, but the implementation basically calls unsubscribe() and shutdownNow() which generally do best effort to terminate outstanding work. So it is quite possible an unresponsive/ignorant work may keep a worker thread operational and make Tomcat still print out the leak notification.

Is the underlying issue still relevant (almost a month passed with no feedback)?

*/
public void shutdown() {
// do nothing by default
}
}
69 changes: 49 additions & 20 deletions src/main/java/rx/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
*/
package rx.schedulers;

import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
Expand All @@ -24,27 +32,32 @@
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import java.util.Iterator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/* package */final class CachedThreadScheduler extends Scheduler {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
/* private */static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
private static final RxThreadFactory WORKER_THREAD_FACTORY =
new RxThreadFactory(WORKER_THREAD_NAME_PREFIX);

private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
/* private */static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
private static final RxThreadFactory EVICTOR_THREAD_FACTORY =
new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);

private static final class CachedWorkerPool {
volatile int done;
static final AtomicIntegerFieldUpdater<CachedThreadScheduler> DONE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(CachedThreadScheduler.class, "done");

CachedWorkerPool workerPool = new CachedWorkerPool(
60L, TimeUnit.SECONDS
);

private static final class CachedWorkerPool implements Subscription {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final ScheduledExecutorService evictExpiredWorkerExecutor;
private final CompositeSubscription all;

CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.all = new CompositeSubscription();

evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
evictExpiredWorkerExecutor.scheduleWithFixedDelay(
Expand All @@ -57,10 +70,6 @@ public void run() {
);
}

private static CachedWorkerPool INSTANCE = new CachedWorkerPool(
60L, TimeUnit.SECONDS
);

ThreadWorker get() {
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
Expand All @@ -70,7 +79,9 @@ ThreadWorker get() {
}

// No cached worker found, so create a new one.
return new ThreadWorker(WORKER_THREAD_FACTORY);
ThreadWorker threadWorker = new ThreadWorker(WORKER_THREAD_FACTORY);
all.add(threadWorker);
return threadWorker;
}

void release(ThreadWorker threadWorker) {
Expand All @@ -89,7 +100,7 @@ void evictExpiredWorkers() {
ThreadWorker threadWorker = threadWorkerIterator.next();
if (threadWorker.getExpirationTime() <= currentTimestamp) {
threadWorkerIterator.remove();
threadWorker.unsubscribe();
all.remove(threadWorker);
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
Expand All @@ -102,36 +113,47 @@ void evictExpiredWorkers() {
long now() {
return System.nanoTime();
}
@Override
public boolean isUnsubscribed() {
return all.isUnsubscribed();
}
@Override
public void unsubscribe() {
evictExpiredWorkerExecutor.shutdownNow();
all.unsubscribe();
}
}

@Override
public Worker createWorker() {
return new EventLoopWorker(CachedWorkerPool.INSTANCE.get());
return new EventLoopWorker(workerPool);
}

private static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final ThreadWorker threadWorker;
private final CachedWorkerPool pool;
volatile int once;
static final AtomicIntegerFieldUpdater<EventLoopWorker> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once");

EventLoopWorker(ThreadWorker threadWorker) {
this.threadWorker = threadWorker;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.threadWorker = pool.get();
}

@Override
public void unsubscribe() {
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
// unsubscribe should be idempotent, so only do this once
CachedWorkerPool.INSTANCE.release(threadWorker);
pool.release(threadWorker);
}
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
return once == 1;
}

@Override
Expand All @@ -141,7 +163,7 @@ public Subscription schedule(Action0 action) {

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
if (isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}
Expand Down Expand Up @@ -169,4 +191,11 @@ public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}

@Override
public void shutdown() {
if (DONE_UPDATER.getAndSet(this, 1) == 0) {
workerPool.unsubscribe();
}
}
}
14 changes: 10 additions & 4 deletions src/main/java/rx/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package rx.schedulers;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
Expand All @@ -24,12 +27,9 @@
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/* package */class EventLoopsScheduler extends Scheduler {
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
/* private */static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

static final class FixedSchedulerPool {
Expand Down Expand Up @@ -110,4 +110,10 @@ private static final class PoolWorker extends NewThreadWorker {
super(threadFactory);
}
}
@Override
public void shutdown() {
for (PoolWorker pw : pool.eventLoops) {
pw.unsubscribe();
}
}
}
8 changes: 8 additions & 0 deletions src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
Expand Down Expand Up @@ -192,4 +194,10 @@ public void unsubscribe() {
}

}
@Override
public void shutdown() {
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdownNow();
}
}
}
106 changes: 80 additions & 26 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,37 @@
*/
package rx.schedulers;

import java.util.concurrent.Executor;

import rx.Scheduler;
import rx.plugins.RxJavaPlugins;

import java.util.concurrent.Executor;

/**
* Static factory methods for creating Schedulers.
*/
public final class Schedulers {

private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();

private Schedulers() {
Scheduler c = RxJavaPlugins.getInstance().getSchedulersHook().getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new EventLoopsScheduler();
}

Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}

private static final Object computationGuard = new Object();
/** The computation scheduler instance, guarded by computationGuard. */
private static volatile Scheduler computationScheduler;
private static final Object ioGuard = new Object();
/** The io scheduler instance, guarded by ioGuard. */
private static volatile Scheduler ioScheduler;
/** The new thread scheduler, fixed because it doesn't need to support shutdown. */
private static final Scheduler newThreadScheduler;

static {
Scheduler nt = RxJavaPlugins.getInstance().getSchedulersHook().getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = NewThreadScheduler.instance();
}
}

private Schedulers() {
throw new IllegalStateException("No instances!");
}

/**
* Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
Expand Down Expand Up @@ -81,7 +74,7 @@ public static Scheduler trampoline() {
* @return a {@link NewThreadScheduler} instance
*/
public static Scheduler newThread() {
return INSTANCE.newThreadScheduler;
return newThreadScheduler;
}

/**
Expand All @@ -96,7 +89,21 @@ public static Scheduler newThread() {
* @return a {@link Scheduler} meant for computation-bound work
*/
public static Scheduler computation() {
return INSTANCE.computationScheduler;
Scheduler s = computationScheduler;
if (s != null) {
return s;
}
synchronized (computationGuard) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like putting a synchronized block in the line of fire on something like this that doesn't need to be.

This is a global lock and exactly the type of stuff I find in our production environments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course we can do shutdown without this, but then the schedulers are dead and without app restrart, they won't come back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to mention, they can't be tested because they shutdown the global threads for good.

if (computationScheduler == null) {
Scheduler c = RxJavaPlugins.getInstance().getSchedulersHook().getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new EventLoopsScheduler();
}
}
return computationScheduler;
}
}

/**
Expand All @@ -113,7 +120,21 @@ public static Scheduler computation() {
* @return a {@link Scheduler} meant for IO-bound work
*/
public static Scheduler io() {
return INSTANCE.ioScheduler;
Scheduler s = ioScheduler;
if (s != null) {
return s;
}
synchronized (ioGuard) {
if (ioScheduler == null) {
Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}
}
return ioScheduler;
}
}

/**
Expand All @@ -136,4 +157,37 @@ public static TestScheduler test() {
public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}
/**
* Shuts down the threads of the Computation and IO schedulers.
* The newThread() scheduler doesn't need to be shut down as its workers shut themselves
* down once they complete.
*/
public static void shutdown() {
synchronized (computationGuard) {
if (computationScheduler != null) {
computationScheduler.shutdown();
}
computationScheduler = null;
}
synchronized (ioGuard) {
if (ioScheduler != null) {
ioScheduler.shutdown();
}
ioScheduler = null;
}
}
/**
* Test support.
* @return returns true if there is a computation scheduler instance available.
*/
static boolean hasComputationScheduler() {
return computationScheduler != null;
}
/**
* Test support.
* @return returns true if there is an io scheduler instance available.
*/
static boolean hasIOScheduler() {
return ioScheduler != null;
}
}
Loading