Skip to content

Commit 91eeea0

Browse files
authored
1.x: allow customizing GenericScheduledExecutorService via RxJavaHooks (#4173)
* 1.x: allow customizing GenericScheduledExecutorService via RxJavaHooks * Had to make Schedulers.start public otherwise this wouldn't work * onGenericScheduledExecutorService is null by default
1 parent 6b47b11 commit 91eeea0

File tree

6 files changed

+136
-8
lines changed

6 files changed

+136
-8
lines changed

src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.concurrent.atomic.AtomicReference;
2020

2121
import rx.Scheduler;
22-
import rx.internal.util.RxThreadFactory;
2322

2423
/**
2524
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
@@ -32,9 +31,6 @@
3231
*/
3332
public final class GenericScheduledExecutorService implements SchedulerLifecycle {
3433

35-
private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
36-
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
37-
3834
private static final ScheduledExecutorService[] NONE = new ScheduledExecutorService[0];
3935

4036
private static final ScheduledExecutorService SHUTDOWN;
@@ -72,7 +68,7 @@ public void start() {
7268

7369
ScheduledExecutorService[] execs = new ScheduledExecutorService[count];
7470
for (int i = 0; i < count; i++) {
75-
execs[i] = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
71+
execs[i] = GenericScheduledExecutorServiceFactory.create();
7672
}
7773
if (executor.compareAndSet(NONE, execs)) {
7874
for (ScheduledExecutorService exec : execs) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package rx.internal.schedulers;
2+
3+
import java.util.concurrent.*;
4+
5+
import rx.functions.Func0;
6+
import rx.internal.util.RxThreadFactory;
7+
import rx.plugins.RxJavaHooks;
8+
9+
/**
10+
* Utility class to create the individual ScheduledExecutorService instances for
11+
* the GenericScheduledExecutorService class.
12+
*/
13+
enum GenericScheduledExecutorServiceFactory {
14+
;
15+
16+
static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
17+
static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
18+
19+
static ThreadFactory threadFactory() {
20+
return THREAD_FACTORY;
21+
}
22+
23+
/**
24+
* Creates a ScheduledExecutorService (either the default or given by a hook).
25+
* @return the SchuduledExecutorService created.
26+
*/
27+
public static ScheduledExecutorService create() {
28+
Func0<? extends ScheduledExecutorService> f = RxJavaHooks.getOnGenericScheduledExecutorService();
29+
if (f == null) {
30+
return createDefault();
31+
}
32+
return f.call();
33+
}
34+
35+
36+
static ScheduledExecutorService createDefault() {
37+
return Executors.newScheduledThreadPool(1, threadFactory());
38+
}
39+
}

src/main/java/rx/plugins/RxJavaHooks.java

+36-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.plugins;
1717

1818
import java.lang.Thread.UncaughtExceptionHandler;
19+
import java.util.concurrent.ScheduledExecutorService;
1920

2021
import rx.*;
2122
import rx.Completable.*;
@@ -67,6 +68,8 @@ public final class RxJavaHooks {
6768
static volatile Func1<Subscription, Subscription> onObservableReturn;
6869

6970
static volatile Func1<Subscription, Subscription> onSingleReturn;
71+
72+
static volatile Func0<? extends ScheduledExecutorService> onGenericScheduledExecutorService;
7073

7174
static volatile Func1<Throwable, Throwable> onObservableSubscribeError;
7275

@@ -230,6 +233,7 @@ public static void reset() {
230233
onComputationScheduler = null;
231234
onIOScheduler = null;
232235
onNewThreadScheduler = null;
236+
onGenericScheduledExecutorService = null;
233237
}
234238

235239
/**
@@ -265,8 +269,9 @@ public static void clear() {
265269
onComputationScheduler = null;
266270
onIOScheduler = null;
267271
onNewThreadScheduler = null;
268-
272+
269273
onScheduleAction = null;
274+
onGenericScheduledExecutorService = null;
270275
}
271276

272277
/**
@@ -1195,4 +1200,34 @@ public CompletableOnSubscribe call(CompletableOnSubscribe f) {
11951200
};
11961201

11971202
}
1203+
/**
1204+
* Sets the hook function for returning a ScheduledExecutorService used
1205+
* by the GenericScheduledExecutorService for background tasks.
1206+
* <p>
1207+
* This operation is threadsafe.
1208+
* <p>
1209+
* Calling with a {@code null} parameter restores the default behavior:
1210+
* create the default with {@link java.util.concurrent.Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)}.
1211+
* <p>
1212+
* For the changes to take effect, the Schedulers has to be restarted.
1213+
* @param factory the supplier that is called when the GenericScheduledExecutorService
1214+
* is (re)started
1215+
*/
1216+
public static void setOnGenericScheduledExecutorService(Func0<? extends ScheduledExecutorService> factory) {
1217+
if (lockdown) {
1218+
return;
1219+
}
1220+
onGenericScheduledExecutorService = factory;
1221+
}
1222+
1223+
/**
1224+
* Returns the current factory for creating ScheduledExecutorServices in
1225+
* GenericScheduledExecutorService utility.
1226+
* <p>
1227+
* This operation is threadsafe.
1228+
* @return the current factory function
1229+
*/
1230+
public static Func0<? extends ScheduledExecutorService> getOnGenericScheduledExecutorService() {
1231+
return onGenericScheduledExecutorService;
1232+
}
11981233
}

src/main/java/rx/schedulers/Schedulers.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public static void reset() {
178178
* Starts those standard Schedulers which support the SchedulerLifecycle interface.
179179
* <p>The operation is idempotent and threadsafe.
180180
*/
181-
/* public test only */ static void start() {
181+
public static void start() {
182182
Schedulers s = getInstance();
183183

184184
s.startInstance();

src/test/java/rx/plugins/RxJavaHooksTest.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ public void lockdown() throws Exception {
173173
try {
174174
assertTrue(RxJavaHooks.isLockdown());
175175
Action1 a1 = Actions.empty();
176+
Func0 f0 = new Func0() {
177+
@Override
178+
public Object call() {
179+
return null;
180+
}
181+
};
176182
Func1 f1 = UtilityFunctions.identity();
177183
Func2 f2 = new Func2() {
178184
@Override
@@ -188,6 +194,9 @@ public Object call(Object t1, Object t2) {
188194

189195
Object before = getter.invoke(null);
190196

197+
if (m.getParameterTypes()[0].isAssignableFrom(Func0.class)) {
198+
m.invoke(null, f0);
199+
} else
191200
if (m.getParameterTypes()[0].isAssignableFrom(Func1.class)) {
192201
m.invoke(null, f1);
193202
} else
@@ -640,7 +649,9 @@ public void clear() throws Exception {
640649
}
641650

642651
for (Method m : RxJavaHooks.class.getMethods()) {
643-
if (m.getName().startsWith("getOn") && !m.getName().endsWith("Scheduler")) {
652+
if (m.getName().startsWith("getOn")
653+
&& !m.getName().endsWith("Scheduler")
654+
&& !m.getName().contains("GenericScheduledExecutorService")) {
644655
assertNotNull(m.toString(), m.invoke(null));
645656
}
646657
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package rx.schedulers;
2+
3+
import java.util.concurrent.*;
4+
5+
import org.junit.*;
6+
7+
import rx.functions.Func0;
8+
import rx.internal.schedulers.GenericScheduledExecutorService;
9+
import rx.plugins.RxJavaHooks;
10+
11+
public class GenericScheduledExecutorServiceTest {
12+
13+
@Test
14+
public void genericScheduledExecutorServiceHook() {
15+
// make sure the class is initialized
16+
Assert.assertNotNull(GenericScheduledExecutorService.class);
17+
18+
final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
19+
try {
20+
21+
RxJavaHooks.setOnGenericScheduledExecutorService(new Func0<ScheduledExecutorService>() {
22+
@Override
23+
public ScheduledExecutorService call() {
24+
return exec;
25+
}
26+
});
27+
28+
Schedulers.shutdown();
29+
Schedulers.start();
30+
31+
Assert.assertSame(exec, GenericScheduledExecutorService.getInstance());
32+
33+
RxJavaHooks.setOnGenericScheduledExecutorService(null);
34+
35+
Schedulers.shutdown();
36+
// start() is package private so had to move this test here
37+
Schedulers.start();
38+
39+
Assert.assertNotSame(exec, GenericScheduledExecutorService.getInstance());
40+
41+
} finally {
42+
RxJavaHooks.reset();
43+
exec.shutdownNow();
44+
}
45+
46+
}
47+
}

0 commit comments

Comments
 (0)