diff --git a/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java b/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java index 3bc60f076b..9a22373d4d 100644 --- a/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java +++ b/src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java @@ -19,7 +19,6 @@ import java.util.concurrent.atomic.AtomicReference; import rx.Scheduler; -import rx.internal.util.RxThreadFactory; /** * A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability. @@ -32,9 +31,6 @@ */ public final class GenericScheduledExecutorService implements SchedulerLifecycle { - private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-"; - private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); - private static final ScheduledExecutorService[] NONE = new ScheduledExecutorService[0]; private static final ScheduledExecutorService SHUTDOWN; @@ -72,7 +68,7 @@ public void start() { ScheduledExecutorService[] execs = new ScheduledExecutorService[count]; for (int i = 0; i < count; i++) { - execs[i] = Executors.newScheduledThreadPool(1, THREAD_FACTORY); + execs[i] = GenericScheduledExecutorServiceFactory.create(); } if (executor.compareAndSet(NONE, execs)) { for (ScheduledExecutorService exec : execs) { diff --git a/src/main/java/rx/internal/schedulers/GenericScheduledExecutorServiceFactory.java b/src/main/java/rx/internal/schedulers/GenericScheduledExecutorServiceFactory.java new file mode 100644 index 0000000000..c0ea18061b --- /dev/null +++ b/src/main/java/rx/internal/schedulers/GenericScheduledExecutorServiceFactory.java @@ -0,0 +1,39 @@ +package rx.internal.schedulers; + +import java.util.concurrent.*; + +import rx.functions.Func0; +import rx.internal.util.RxThreadFactory; +import rx.plugins.RxJavaHooks; + +/** + * Utility class to create the individual ScheduledExecutorService instances for + * the GenericScheduledExecutorService class. + */ +enum GenericScheduledExecutorServiceFactory { + ; + + static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-"; + static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); + + static ThreadFactory threadFactory() { + return THREAD_FACTORY; + } + + /** + * Creates a ScheduledExecutorService (either the default or given by a hook). + * @return the SchuduledExecutorService created. + */ + public static ScheduledExecutorService create() { + Func0 f = RxJavaHooks.getOnGenericScheduledExecutorService(); + if (f == null) { + return createDefault(); + } + return f.call(); + } + + + static ScheduledExecutorService createDefault() { + return Executors.newScheduledThreadPool(1, threadFactory()); + } +} diff --git a/src/main/java/rx/plugins/RxJavaHooks.java b/src/main/java/rx/plugins/RxJavaHooks.java index b68e01c287..70268c4c14 100644 --- a/src/main/java/rx/plugins/RxJavaHooks.java +++ b/src/main/java/rx/plugins/RxJavaHooks.java @@ -16,6 +16,7 @@ package rx.plugins; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.ScheduledExecutorService; import rx.*; import rx.Completable.*; @@ -67,6 +68,8 @@ public final class RxJavaHooks { static volatile Func1 onObservableReturn; static volatile Func1 onSingleReturn; + + static volatile Func0 onGenericScheduledExecutorService; static volatile Func1 onObservableSubscribeError; @@ -230,6 +233,7 @@ public static void reset() { onComputationScheduler = null; onIOScheduler = null; onNewThreadScheduler = null; + onGenericScheduledExecutorService = null; } /** @@ -265,8 +269,9 @@ public static void clear() { onComputationScheduler = null; onIOScheduler = null; onNewThreadScheduler = null; - + onScheduleAction = null; + onGenericScheduledExecutorService = null; } /** @@ -1195,4 +1200,34 @@ public CompletableOnSubscribe call(CompletableOnSubscribe f) { }; } + /** + * Sets the hook function for returning a ScheduledExecutorService used + * by the GenericScheduledExecutorService for background tasks. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * create the default with {@link java.util.concurrent.Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)}. + *

+ * For the changes to take effect, the Schedulers has to be restarted. + * @param factory the supplier that is called when the GenericScheduledExecutorService + * is (re)started + */ + public static void setOnGenericScheduledExecutorService(Func0 factory) { + if (lockdown) { + return; + } + onGenericScheduledExecutorService = factory; + } + + /** + * Returns the current factory for creating ScheduledExecutorServices in + * GenericScheduledExecutorService utility. + *

+ * This operation is threadsafe. + * @return the current factory function + */ + public static Func0 getOnGenericScheduledExecutorService() { + return onGenericScheduledExecutorService; + } } diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index a24504f94d..8df2dacf99 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -178,7 +178,7 @@ public static void reset() { * Starts those standard Schedulers which support the SchedulerLifecycle interface. *

The operation is idempotent and threadsafe. */ - /* public test only */ static void start() { + public static void start() { Schedulers s = getInstance(); s.startInstance(); diff --git a/src/test/java/rx/plugins/RxJavaHooksTest.java b/src/test/java/rx/plugins/RxJavaHooksTest.java index f13c8514b7..9209228124 100644 --- a/src/test/java/rx/plugins/RxJavaHooksTest.java +++ b/src/test/java/rx/plugins/RxJavaHooksTest.java @@ -173,6 +173,12 @@ public void lockdown() throws Exception { try { assertTrue(RxJavaHooks.isLockdown()); Action1 a1 = Actions.empty(); + Func0 f0 = new Func0() { + @Override + public Object call() { + return null; + } + }; Func1 f1 = UtilityFunctions.identity(); Func2 f2 = new Func2() { @Override @@ -188,6 +194,9 @@ public Object call(Object t1, Object t2) { Object before = getter.invoke(null); + if (m.getParameterTypes()[0].isAssignableFrom(Func0.class)) { + m.invoke(null, f0); + } else if (m.getParameterTypes()[0].isAssignableFrom(Func1.class)) { m.invoke(null, f1); } else @@ -640,7 +649,9 @@ public void clear() throws Exception { } for (Method m : RxJavaHooks.class.getMethods()) { - if (m.getName().startsWith("getOn") && !m.getName().endsWith("Scheduler")) { + if (m.getName().startsWith("getOn") + && !m.getName().endsWith("Scheduler") + && !m.getName().contains("GenericScheduledExecutorService")) { assertNotNull(m.toString(), m.invoke(null)); } } diff --git a/src/test/java/rx/schedulers/GenericScheduledExecutorServiceTest.java b/src/test/java/rx/schedulers/GenericScheduledExecutorServiceTest.java new file mode 100644 index 0000000000..6a92227bf5 --- /dev/null +++ b/src/test/java/rx/schedulers/GenericScheduledExecutorServiceTest.java @@ -0,0 +1,47 @@ +package rx.schedulers; + +import java.util.concurrent.*; + +import org.junit.*; + +import rx.functions.Func0; +import rx.internal.schedulers.GenericScheduledExecutorService; +import rx.plugins.RxJavaHooks; + +public class GenericScheduledExecutorServiceTest { + + @Test + public void genericScheduledExecutorServiceHook() { + // make sure the class is initialized + Assert.assertNotNull(GenericScheduledExecutorService.class); + + final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + + RxJavaHooks.setOnGenericScheduledExecutorService(new Func0() { + @Override + public ScheduledExecutorService call() { + return exec; + } + }); + + Schedulers.shutdown(); + Schedulers.start(); + + Assert.assertSame(exec, GenericScheduledExecutorService.getInstance()); + + RxJavaHooks.setOnGenericScheduledExecutorService(null); + + Schedulers.shutdown(); + // start() is package private so had to move this test here + Schedulers.start(); + + Assert.assertNotSame(exec, GenericScheduledExecutorService.getInstance()); + + } finally { + RxJavaHooks.reset(); + exec.shutdownNow(); + } + + } +}