Skip to content

Commit bfc259d

Browse files
committed
Add Schedulers.reset() for better testing
Resolves #3985
1 parent 6004156 commit bfc259d

File tree

2 files changed

+94
-11
lines changed

2 files changed

+94
-11
lines changed

src/main/java/rx/schedulers/Schedulers.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616
package rx.schedulers;
1717

1818
import rx.Scheduler;
19-
import rx.internal.schedulers.*;
19+
import rx.annotations.Experimental;
20+
import rx.internal.schedulers.ExecutorScheduler;
21+
import rx.internal.schedulers.GenericScheduledExecutorService;
22+
import rx.internal.schedulers.SchedulerLifecycle;
2023
import rx.internal.util.RxRingBuffer;
2124
import rx.plugins.RxJavaPlugins;
2225
import rx.plugins.RxJavaSchedulersHook;
2326

2427
import java.util.concurrent.Executor;
28+
import java.util.concurrent.atomic.AtomicReference;
2529

2630
/**
2731
* Static factory methods for creating Schedulers.
@@ -32,7 +36,22 @@ public final class Schedulers {
3236
private final Scheduler ioScheduler;
3337
private final Scheduler newThreadScheduler;
3438

35-
private static final Schedulers INSTANCE = new Schedulers();
39+
private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();
40+
41+
private static Schedulers getInstance() {
42+
for (;;) {
43+
Schedulers current = INSTANCE.get();
44+
if (current != null) {
45+
return current;
46+
}
47+
current = new Schedulers();
48+
if (INSTANCE.compareAndSet(null, current)) {
49+
return current;
50+
} else {
51+
shutdown();
52+
}
53+
}
54+
}
3655

3756
private Schedulers() {
3857
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
@@ -86,7 +105,7 @@ public static Scheduler trampoline() {
86105
* @return a {@link Scheduler} that creates new threads
87106
*/
88107
public static Scheduler newThread() {
89-
return INSTANCE.newThreadScheduler;
108+
return getInstance().newThreadScheduler;
90109
}
91110

92111
/**
@@ -101,7 +120,7 @@ public static Scheduler newThread() {
101120
* @return a {@link Scheduler} meant for computation-bound work
102121
*/
103122
public static Scheduler computation() {
104-
return INSTANCE.computationScheduler;
123+
return getInstance().computationScheduler;
105124
}
106125

107126
/**
@@ -118,7 +137,7 @@ public static Scheduler computation() {
118137
* @return a {@link Scheduler} meant for IO-bound work
119138
*/
120139
public static Scheduler io() {
121-
return INSTANCE.ioScheduler;
140+
return getInstance().ioScheduler;
122141
}
123142

124143
/**
@@ -141,13 +160,27 @@ public static TestScheduler test() {
141160
public static Scheduler from(Executor executor) {
142161
return new ExecutorScheduler(executor);
143162
}
163+
164+
/**
165+
* Resets the current {@link Schedulers} instance.
166+
* <p>
167+
* This API is experimental. Resetting the schedulers is dangerous
168+
* during application runtime and also bad code could invoke it in
169+
* the middle of an application life-cycle and really break applications
170+
* if not used cautiously.
171+
*/
172+
@Experimental
173+
public static void reset() {
174+
shutdown();
175+
INSTANCE.set(null);
176+
}
144177

145178
/**
146179
* Starts those standard Schedulers which support the SchedulerLifecycle interface.
147180
* <p>The operation is idempotent and threadsafe.
148181
*/
149182
/* public test only */ static void start() {
150-
Schedulers s = INSTANCE;
183+
Schedulers s = getInstance();
151184
synchronized (s) {
152185
if (s.computationScheduler instanceof SchedulerLifecycle) {
153186
((SchedulerLifecycle) s.computationScheduler).start();
@@ -170,7 +203,7 @@ public static Scheduler from(Executor executor) {
170203
* <p>The operation is idempotent and threadsafe.
171204
*/
172205
public static void shutdown() {
173-
Schedulers s = INSTANCE;
206+
Schedulers s = getInstance();
174207
synchronized (s) {
175208
if (s.computationScheduler instanceof SchedulerLifecycle) {
176209
((SchedulerLifecycle) s.computationScheduler).shutdown();
@@ -181,12 +214,12 @@ public static void shutdown() {
181214
if (s.newThreadScheduler instanceof SchedulerLifecycle) {
182215
((SchedulerLifecycle) s.newThreadScheduler).shutdown();
183216
}
184-
217+
185218
GenericScheduledExecutorService.INSTANCE.shutdown();
186-
219+
187220
RxRingBuffer.SPSC_POOL.shutdown();
188-
221+
189222
RxRingBuffer.SPMC_POOL.shutdown();
190223
}
191224
}
192-
}
225+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package rx.schedulers;
2+
3+
4+
import org.junit.Test;
5+
import rx.Scheduler;
6+
import rx.internal.schedulers.*;
7+
import rx.plugins.RxJavaPlugins;
8+
import rx.plugins.RxJavaSchedulersHook;
9+
10+
import static org.junit.Assert.assertTrue;
11+
12+
public class ResetSchedulersTest {
13+
14+
@Test
15+
public void reset() {
16+
RxJavaPlugins.getInstance().reset();
17+
18+
final TestScheduler testScheduler = new TestScheduler();
19+
RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() {
20+
@Override
21+
public Scheduler getComputationScheduler() {
22+
return testScheduler;
23+
}
24+
25+
@Override
26+
public Scheduler getIOScheduler() {
27+
return testScheduler;
28+
}
29+
30+
@Override
31+
public Scheduler getNewThreadScheduler() {
32+
return testScheduler;
33+
}
34+
});
35+
Schedulers.reset();
36+
37+
assertTrue(Schedulers.io().equals(testScheduler));
38+
assertTrue(Schedulers.computation().equals(testScheduler));
39+
assertTrue(Schedulers.newThread().equals(testScheduler));
40+
41+
RxJavaPlugins.getInstance().reset();
42+
RxJavaPlugins.getInstance().registerSchedulersHook(RxJavaSchedulersHook.getDefaultInstance());
43+
Schedulers.reset();
44+
45+
assertTrue(Schedulers.io() instanceof CachedThreadScheduler);
46+
assertTrue(Schedulers.computation() instanceof EventLoopsScheduler);
47+
assertTrue(Schedulers.newThread() instanceof rx.internal.schedulers.NewThreadScheduler);
48+
}
49+
50+
}

0 commit comments

Comments
 (0)