diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index eae594ef08..269c836864 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -16,12 +16,16 @@ package rx.schedulers; import rx.Scheduler; -import rx.internal.schedulers.*; +import rx.annotations.Experimental; +import rx.internal.schedulers.ExecutorScheduler; +import rx.internal.schedulers.GenericScheduledExecutorService; +import rx.internal.schedulers.SchedulerLifecycle; import rx.internal.util.RxRingBuffer; import rx.plugins.RxJavaPlugins; import rx.plugins.RxJavaSchedulersHook; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; /** * Static factory methods for creating Schedulers. @@ -32,7 +36,22 @@ public final class Schedulers { private final Scheduler ioScheduler; private final Scheduler newThreadScheduler; - private static final Schedulers INSTANCE = new Schedulers(); + private static final AtomicReference INSTANCE = new AtomicReference(); + + private static Schedulers getInstance() { + for (;;) { + Schedulers current = INSTANCE.get(); + if (current != null) { + return current; + } + current = new Schedulers(); + if (INSTANCE.compareAndSet(null, current)) { + return current; + } else { + shutdown(); + } + } + } private Schedulers() { RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook(); @@ -86,7 +105,7 @@ public static Scheduler trampoline() { * @return a {@link Scheduler} that creates new threads */ public static Scheduler newThread() { - return INSTANCE.newThreadScheduler; + return getInstance().newThreadScheduler; } /** @@ -101,7 +120,7 @@ public static Scheduler newThread() { * @return a {@link Scheduler} meant for computation-bound work */ public static Scheduler computation() { - return INSTANCE.computationScheduler; + return getInstance().computationScheduler; } /** @@ -118,7 +137,7 @@ public static Scheduler computation() { * @return a {@link Scheduler} meant for IO-bound work */ public static Scheduler io() { - return INSTANCE.ioScheduler; + return getInstance().ioScheduler; } /** @@ -141,13 +160,24 @@ public static TestScheduler test() { public static Scheduler from(Executor executor) { return new ExecutorScheduler(executor); } + + /** + * Resets the current {@link Schedulers} instance. + * This will re-init the cached schedulers on the next usage, + * which can be useful in testing. + */ + @Experimental + public static void reset() { + shutdown(); + INSTANCE.set(null); + } /** * Starts those standard Schedulers which support the SchedulerLifecycle interface. *

The operation is idempotent and threadsafe. */ /* public test only */ static void start() { - Schedulers s = INSTANCE; + Schedulers s = getInstance(); synchronized (s) { if (s.computationScheduler instanceof SchedulerLifecycle) { ((SchedulerLifecycle) s.computationScheduler).start(); @@ -170,7 +200,7 @@ public static Scheduler from(Executor executor) { *

The operation is idempotent and threadsafe. */ public static void shutdown() { - Schedulers s = INSTANCE; + Schedulers s = getInstance(); synchronized (s) { if (s.computationScheduler instanceof SchedulerLifecycle) { ((SchedulerLifecycle) s.computationScheduler).shutdown(); @@ -181,12 +211,12 @@ public static void shutdown() { if (s.newThreadScheduler instanceof SchedulerLifecycle) { ((SchedulerLifecycle) s.newThreadScheduler).shutdown(); } - + GenericScheduledExecutorService.INSTANCE.shutdown(); - + RxRingBuffer.SPSC_POOL.shutdown(); - + RxRingBuffer.SPMC_POOL.shutdown(); } } -} \ No newline at end of file +} diff --git a/src/test/java/rx/schedulers/ResetSchedulersTest.java b/src/test/java/rx/schedulers/ResetSchedulersTest.java new file mode 100644 index 0000000000..79c9f9435b --- /dev/null +++ b/src/test/java/rx/schedulers/ResetSchedulersTest.java @@ -0,0 +1,50 @@ +package rx.schedulers; + + +import org.junit.Test; +import rx.Scheduler; +import rx.internal.schedulers.*; +import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaSchedulersHook; + +import static org.junit.Assert.assertTrue; + +public class ResetSchedulersTest { + + @Test + public void reset() { + RxJavaPlugins.getInstance().reset(); + + final TestScheduler testScheduler = new TestScheduler(); + RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() { + @Override + public Scheduler getComputationScheduler() { + return testScheduler; + } + + @Override + public Scheduler getIOScheduler() { + return testScheduler; + } + + @Override + public Scheduler getNewThreadScheduler() { + return testScheduler; + } + }); + Schedulers.reset(); + + assertTrue(Schedulers.io().equals(testScheduler)); + assertTrue(Schedulers.computation().equals(testScheduler)); + assertTrue(Schedulers.newThread().equals(testScheduler)); + + RxJavaPlugins.getInstance().reset(); + RxJavaPlugins.getInstance().registerSchedulersHook(RxJavaSchedulersHook.getDefaultInstance()); + Schedulers.reset(); + + assertTrue(Schedulers.io() instanceof CachedThreadScheduler); + assertTrue(Schedulers.computation() instanceof EventLoopsScheduler); + assertTrue(Schedulers.newThread() instanceof rx.internal.schedulers.NewThreadScheduler); + } + +}