Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 54 additions & 21 deletions src/main/java/rx/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
*/
package rx.schedulers;

import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
Expand All @@ -24,27 +32,32 @@
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import java.util.Iterator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/* package */final class CachedThreadScheduler extends Scheduler {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
/* package */final class CachedThreadScheduler extends Scheduler implements Subscription {
/* private */static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
private static final RxThreadFactory WORKER_THREAD_FACTORY =
new RxThreadFactory(WORKER_THREAD_NAME_PREFIX);

private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
/* private */static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor-";
private static final RxThreadFactory EVICTOR_THREAD_FACTORY =
new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX);

private static final class CachedWorkerPool {
volatile int done;
static final AtomicIntegerFieldUpdater<CachedThreadScheduler> DONE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(CachedThreadScheduler.class, "done");

CachedWorkerPool workerPool = new CachedWorkerPool(
60L, TimeUnit.SECONDS
);

private static final class CachedWorkerPool implements Subscription {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final ScheduledExecutorService evictExpiredWorkerExecutor;
private final CompositeSubscription all;

CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.all = new CompositeSubscription();

evictExpiredWorkerExecutor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
evictExpiredWorkerExecutor.scheduleWithFixedDelay(
Expand All @@ -57,10 +70,6 @@ public void run() {
);
}

private static CachedWorkerPool INSTANCE = new CachedWorkerPool(
60L, TimeUnit.SECONDS
);

ThreadWorker get() {
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
Expand All @@ -70,7 +79,9 @@ ThreadWorker get() {
}

// No cached worker found, so create a new one.
return new ThreadWorker(WORKER_THREAD_FACTORY);
ThreadWorker threadWorker = new ThreadWorker(WORKER_THREAD_FACTORY);
all.add(threadWorker);
return threadWorker;
}

void release(ThreadWorker threadWorker) {
Expand All @@ -89,7 +100,7 @@ void evictExpiredWorkers() {
ThreadWorker threadWorker = threadWorkerIterator.next();
if (threadWorker.getExpirationTime() <= currentTimestamp) {
threadWorkerIterator.remove();
threadWorker.unsubscribe();
all.remove(threadWorker);
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
Expand All @@ -102,36 +113,47 @@ void evictExpiredWorkers() {
long now() {
return System.nanoTime();
}
@Override
public boolean isUnsubscribed() {
return all.isUnsubscribed();
}
@Override
public void unsubscribe() {
evictExpiredWorkerExecutor.shutdownNow();
all.unsubscribe();
}
}

@Override
public Worker createWorker() {
return new EventLoopWorker(CachedWorkerPool.INSTANCE.get());
return new EventLoopWorker(workerPool);
}

private static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final ThreadWorker threadWorker;
private final CachedWorkerPool pool;
volatile int once;
static final AtomicIntegerFieldUpdater<EventLoopWorker> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once");

EventLoopWorker(ThreadWorker threadWorker) {
this.threadWorker = threadWorker;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.threadWorker = pool.get();
}

@Override
public void unsubscribe() {
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
// unsubscribe should be idempotent, so only do this once
CachedWorkerPool.INSTANCE.release(threadWorker);
pool.release(threadWorker);
}
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
return once == 1;
}

@Override
Expand All @@ -141,7 +163,7 @@ public Subscription schedule(Action0 action) {

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
if (isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}
Expand Down Expand Up @@ -169,4 +191,15 @@ public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}

@Override
public boolean isUnsubscribed() {
return done == 1;
}
@Override
public void unsubscribe() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like reusing the Subscription idea for this as it confuses with the lifecycle that we use Subscription for everywhere else.

This is very much an internal mechanism for shutting down or resetting, not a normal lifecycle event. No one "subscribes" to a thread pool, so unsubscribing to shut down the pool is confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can either create new interface to signal a Scheduler can be shutdown, or add a shutdown method to Scheduler directly.

if (DONE_UPDATER.getAndSet(this, 1) == 0) {
workerPool.unsubscribe();
}
}
}
25 changes: 20 additions & 5 deletions src/main/java/rx/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package rx.schedulers;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
Expand All @@ -24,12 +27,9 @@
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/* package */class EventLoopsScheduler extends Scheduler {
/* package */class EventLoopsScheduler extends Scheduler implements Subscription {
/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
/* private */static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

static final class FixedSchedulerPool {
Expand Down Expand Up @@ -110,4 +110,19 @@ private static final class PoolWorker extends NewThreadWorker {
super(threadFactory);
}
}
@Override
public boolean isUnsubscribed() {
for (PoolWorker pw : pool.eventLoops) {
if (pw.isUnsubscribed()) {
return true;
}
}
return false;
}
@Override
public void unsubscribe() {
for (PoolWorker pw : pool.eventLoops) {
pw.unsubscribe();
}
}
}
103 changes: 77 additions & 26 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,38 @@
*/
package rx.schedulers;

import java.util.concurrent.Executor;

import rx.Scheduler;
import rx.Subscription;
import rx.plugins.RxJavaPlugins;

import java.util.concurrent.Executor;

/**
* Static factory methods for creating Schedulers.
*/
public final class Schedulers {

private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();

private Schedulers() {
Scheduler c = RxJavaPlugins.getInstance().getSchedulersHook().getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new EventLoopsScheduler();
}

Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}

private static final Object computationGuard = new Object();
/** The computation scheduler instance, guarded by computationGuard. */
private static Scheduler computationScheduler;
private static final Object ioGuard = new Object();
/** The io scheduler instance, guarded by ioGuard. */
private static Scheduler ioScheduler;
/** The new thread scheduler, fixed because it doesn't need to support shutdown. */
private static final Scheduler newThreadScheduler;

static {
Scheduler nt = RxJavaPlugins.getInstance().getSchedulersHook().getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = NewThreadScheduler.instance();
}
}

private Schedulers() {
throw new IllegalStateException("No instances!");
}

/**
* Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
Expand Down Expand Up @@ -81,7 +75,7 @@ public static Scheduler trampoline() {
* @return a {@link NewThreadScheduler} instance
*/
public static Scheduler newThread() {
return INSTANCE.newThreadScheduler;
return newThreadScheduler;
}

/**
Expand All @@ -96,7 +90,17 @@ public static Scheduler newThread() {
* @return a {@link Scheduler} meant for computation-bound work
*/
public static Scheduler computation() {
return INSTANCE.computationScheduler;
synchronized (computationGuard) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like putting a synchronized block in the line of fire on something like this that doesn't need to be.

This is a global lock and exactly the type of stuff I find in our production environments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course we can do shutdown without this, but then the schedulers are dead and without app restrart, they won't come back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to mention, they can't be tested because they shutdown the global threads for good.

if (computationScheduler == null) {
Scheduler c = RxJavaPlugins.getInstance().getSchedulersHook().getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new EventLoopsScheduler();
}
}
return computationScheduler;
}
}

/**
Expand All @@ -113,7 +117,17 @@ public static Scheduler computation() {
* @return a {@link Scheduler} meant for IO-bound work
*/
public static Scheduler io() {
return INSTANCE.ioScheduler;
synchronized (ioGuard) {
if (ioScheduler == null) {
Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}
}
return ioScheduler;
}
}

/**
Expand All @@ -136,4 +150,41 @@ public static TestScheduler test() {
public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}
/**
* Shuts down the threads of the Computation and IO schedulers.
* The newThread() scheduler doesn't need to be shut down as its workers shut themselves
* down once they complete.
*/
public static void shutdown() {
synchronized (computationGuard) {
if (computationScheduler instanceof Subscription) {
((Subscription) computationScheduler).unsubscribe();
computationScheduler = null;
}
}
synchronized (ioGuard) {
if (ioScheduler instanceof Subscription) {
((Subscription) ioScheduler).unsubscribe();
ioScheduler = null;
}
}
}
/**
* Test support.
* @return returns true if there is a computation scheduler instance available.
*/
static boolean hasComputationScheduler() {
synchronized (computationGuard) {
return computationScheduler != null;
}
}
/**
* Test support.
* @return returns true if there is an io scheduler instance available.
*/
static boolean hasIOScheduler() {
synchronized (ioGuard) {
return ioScheduler != null;
}
}
}
Loading