Skip to content

1.x: allow customizing GenericScheduledExecutorService via RxJavaHooks #4173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Scheduler;
import rx.internal.util.RxThreadFactory;

/**
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
Expand All @@ -32,9 +31,6 @@
*/
public final class GenericScheduledExecutorService implements SchedulerLifecycle {

private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

private static final ScheduledExecutorService[] NONE = new ScheduledExecutorService[0];

private static final ScheduledExecutorService SHUTDOWN;
Expand Down Expand Up @@ -72,7 +68,7 @@ public void start() {

ScheduledExecutorService[] execs = new ScheduledExecutorService[count];
for (int i = 0; i < count; i++) {
execs[i] = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
execs[i] = GenericScheduledExecutorServiceFactory.create();
}
if (executor.compareAndSet(NONE, execs)) {
for (ScheduledExecutorService exec : execs) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rx.internal.schedulers;

import java.util.concurrent.*;

import rx.functions.Func0;
import rx.internal.util.RxThreadFactory;
import rx.plugins.RxJavaHooks;

/**
* Utility class to create the individual ScheduledExecutorService instances for
* the GenericScheduledExecutorService class.
*/
enum GenericScheduledExecutorServiceFactory {
;

static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

static ThreadFactory threadFactory() {
return THREAD_FACTORY;
}

/**
* Creates a ScheduledExecutorService (either the default or given by a hook).
* @return the SchuduledExecutorService created.
*/
public static ScheduledExecutorService create() {
Func0<? extends ScheduledExecutorService> f = RxJavaHooks.getOnGenericScheduledExecutorService();
if (f == null) {
return createDefault();
}
return f.call();
}


static ScheduledExecutorService createDefault() {
return Executors.newScheduledThreadPool(1, threadFactory());
}
}
37 changes: 36 additions & 1 deletion src/main/java/rx/plugins/RxJavaHooks.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.plugins;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ScheduledExecutorService;

import rx.*;
import rx.Completable.*;
Expand Down Expand Up @@ -67,6 +68,8 @@ public final class RxJavaHooks {
static volatile Func1<Subscription, Subscription> onObservableReturn;

static volatile Func1<Subscription, Subscription> onSingleReturn;

static volatile Func0<? extends ScheduledExecutorService> onGenericScheduledExecutorService;

static volatile Func1<Throwable, Throwable> onObservableSubscribeError;

Expand Down Expand Up @@ -230,6 +233,7 @@ public static void reset() {
onComputationScheduler = null;
onIOScheduler = null;
onNewThreadScheduler = null;
onGenericScheduledExecutorService = null;
}

/**
Expand Down Expand Up @@ -265,8 +269,9 @@ public static void clear() {
onComputationScheduler = null;
onIOScheduler = null;
onNewThreadScheduler = null;

onScheduleAction = null;
onGenericScheduledExecutorService = null;
}

/**
Expand Down Expand Up @@ -1195,4 +1200,34 @@ public CompletableOnSubscribe call(CompletableOnSubscribe f) {
};

}
/**
* Sets the hook function for returning a ScheduledExecutorService used
* by the GenericScheduledExecutorService for background tasks.
* <p>
* This operation is threadsafe.
* <p>
* Calling with a {@code null} parameter restores the default behavior:
* create the default with {@link java.util.concurrent.Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)}.
* <p>
* For the changes to take effect, the Schedulers has to be restarted.
* @param factory the supplier that is called when the GenericScheduledExecutorService
* is (re)started
*/
public static void setOnGenericScheduledExecutorService(Func0<? extends ScheduledExecutorService> factory) {
if (lockdown) {
return;
}
onGenericScheduledExecutorService = factory;
}

/**
* Returns the current factory for creating ScheduledExecutorServices in
* GenericScheduledExecutorService utility.
* <p>
* This operation is threadsafe.
* @return the current factory function
*/
public static Func0<? extends ScheduledExecutorService> getOnGenericScheduledExecutorService() {
return onGenericScheduledExecutorService;
}
}
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public static void reset() {
* Starts those standard Schedulers which support the SchedulerLifecycle interface.
* <p>The operation is idempotent and threadsafe.
*/
/* public test only */ static void start() {
public static void start() {
Schedulers s = getInstance();

s.startInstance();
Expand Down
13 changes: 12 additions & 1 deletion src/test/java/rx/plugins/RxJavaHooksTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ public void lockdown() throws Exception {
try {
assertTrue(RxJavaHooks.isLockdown());
Action1 a1 = Actions.empty();
Func0 f0 = new Func0() {
@Override
public Object call() {
return null;
}
};
Func1 f1 = UtilityFunctions.identity();
Func2 f2 = new Func2() {
@Override
Expand All @@ -188,6 +194,9 @@ public Object call(Object t1, Object t2) {

Object before = getter.invoke(null);

if (m.getParameterTypes()[0].isAssignableFrom(Func0.class)) {
m.invoke(null, f0);
} else
if (m.getParameterTypes()[0].isAssignableFrom(Func1.class)) {
m.invoke(null, f1);
} else
Expand Down Expand Up @@ -640,7 +649,9 @@ public void clear() throws Exception {
}

for (Method m : RxJavaHooks.class.getMethods()) {
if (m.getName().startsWith("getOn") && !m.getName().endsWith("Scheduler")) {
if (m.getName().startsWith("getOn")
&& !m.getName().endsWith("Scheduler")
&& !m.getName().contains("GenericScheduledExecutorService")) {
assertNotNull(m.toString(), m.invoke(null));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rx.schedulers;

import java.util.concurrent.*;

import org.junit.*;

import rx.functions.Func0;
import rx.internal.schedulers.GenericScheduledExecutorService;
import rx.plugins.RxJavaHooks;

public class GenericScheduledExecutorServiceTest {

@Test
public void genericScheduledExecutorServiceHook() {
// make sure the class is initialized
Assert.assertNotNull(GenericScheduledExecutorService.class);

final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
try {

RxJavaHooks.setOnGenericScheduledExecutorService(new Func0<ScheduledExecutorService>() {
@Override
public ScheduledExecutorService call() {
return exec;
}
});

Schedulers.shutdown();
Schedulers.start();

Assert.assertSame(exec, GenericScheduledExecutorService.getInstance());

RxJavaHooks.setOnGenericScheduledExecutorService(null);

Schedulers.shutdown();
// start() is package private so had to move this test here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not valid

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will clean it in the next coverage test PR.

Schedulers.start();

Assert.assertNotSame(exec, GenericScheduledExecutorService.getInstance());

} finally {
RxJavaHooks.reset();
exec.shutdownNow();
}

}
}