Skip to content

Add Schedulers.reset() for better testing #3986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
package rx.schedulers;

import rx.Scheduler;
import rx.internal.schedulers.*;
import rx.annotations.Experimental;
import rx.internal.schedulers.ExecutorScheduler;
import rx.internal.schedulers.GenericScheduledExecutorService;
import rx.internal.schedulers.SchedulerLifecycle;
import rx.internal.util.RxRingBuffer;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

/**
* Static factory methods for creating Schedulers.
Expand All @@ -32,7 +36,22 @@ public final class Schedulers {
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;

private static final Schedulers INSTANCE = new Schedulers();
private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();

private static Schedulers getInstance() {
for (;;) {
Schedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new Schedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
} else {
shutdown();
}
}
}
Copy link
Member

@akarnokd akarnokd Jun 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use atomics instead:

private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();

private static Schedulers getInstance() {
    for (;;) {
         Schedulers current = INSTANCE.get();
         if (current != null) {
            return current;
         }
         current = new Schedulers();
         if (INSTANCE.compareAndSet(null, current)) {
             return current;
         } else {
             current.shutdown();
         }
    }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious what's the advantage of this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronized holds the potential of getting blocked if somebody else is already in it. If the Schedulers is not constantly and concurrently reset all the time, the atomic approach is a single non-blocking volatile read of a non-null instance value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() is currently a static method. Should current.shutdown() be just shutdown() or should shutdown() be changed to an instance method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two problems:

  • shutdown() is public API and that would be an incompatible change
  • you'd have to expose Schedulers as an instance somehow call an instance method on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you propose?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave the shutdown() as it is now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. Do you think there's any risk of deadlock since we'd be calling it from getInstance() and it calls getInstance() internally?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated in fe2157c


private Schedulers() {
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
Expand Down Expand Up @@ -86,7 +105,7 @@ public static Scheduler trampoline() {
* @return a {@link Scheduler} that creates new threads
*/
public static Scheduler newThread() {
return INSTANCE.newThreadScheduler;
return getInstance().newThreadScheduler;
}

/**
Expand All @@ -101,7 +120,7 @@ public static Scheduler newThread() {
* @return a {@link Scheduler} meant for computation-bound work
*/
public static Scheduler computation() {
return INSTANCE.computationScheduler;
return getInstance().computationScheduler;
}

/**
Expand All @@ -118,7 +137,7 @@ public static Scheduler computation() {
* @return a {@link Scheduler} meant for IO-bound work
*/
public static Scheduler io() {
return INSTANCE.ioScheduler;
return getInstance().ioScheduler;
}

/**
Expand All @@ -141,13 +160,24 @@ public static TestScheduler test() {
public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}

/**
* Resets the current {@link Schedulers} instance.
* This will re-init the cached schedulers on the next usage,
* which can be useful in testing.
*/
@Experimental
public static void reset() {
shutdown();
INSTANCE.set(null);
}

/**
* Starts those standard Schedulers which support the SchedulerLifecycle interface.
* <p>The operation is idempotent and threadsafe.
*/
/* public test only */ static void start() {
Schedulers s = INSTANCE;
Schedulers s = getInstance();
synchronized (s) {
if (s.computationScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.computationScheduler).start();
Expand All @@ -170,7 +200,7 @@ public static Scheduler from(Executor executor) {
* <p>The operation is idempotent and threadsafe.
*/
public static void shutdown() {
Schedulers s = INSTANCE;
Schedulers s = getInstance();
synchronized (s) {
if (s.computationScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.computationScheduler).shutdown();
Expand All @@ -181,12 +211,12 @@ public static void shutdown() {
if (s.newThreadScheduler instanceof SchedulerLifecycle) {
((SchedulerLifecycle) s.newThreadScheduler).shutdown();
}

GenericScheduledExecutorService.INSTANCE.shutdown();

RxRingBuffer.SPSC_POOL.shutdown();

RxRingBuffer.SPMC_POOL.shutdown();
}
}
}
}
50 changes: 50 additions & 0 deletions src/test/java/rx/schedulers/ResetSchedulersTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package rx.schedulers;


import org.junit.Test;
import rx.Scheduler;
import rx.internal.schedulers.*;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

import static org.junit.Assert.assertTrue;

public class ResetSchedulersTest {

@Test
public void reset() {
RxJavaPlugins.getInstance().reset();

final TestScheduler testScheduler = new TestScheduler();
RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() {
@Override
public Scheduler getComputationScheduler() {
return testScheduler;
}

@Override
public Scheduler getIOScheduler() {
return testScheduler;
}

@Override
public Scheduler getNewThreadScheduler() {
return testScheduler;
}
});
Schedulers.reset();

assertTrue(Schedulers.io().equals(testScheduler));
assertTrue(Schedulers.computation().equals(testScheduler));
assertTrue(Schedulers.newThread().equals(testScheduler));

RxJavaPlugins.getInstance().reset();
RxJavaPlugins.getInstance().registerSchedulersHook(RxJavaSchedulersHook.getDefaultInstance());
Schedulers.reset();

assertTrue(Schedulers.io() instanceof CachedThreadScheduler);
assertTrue(Schedulers.computation() instanceof EventLoopsScheduler);
assertTrue(Schedulers.newThread() instanceof rx.internal.schedulers.NewThreadScheduler);
}

}