Skip to content

3.x: Add RxJavaPlugins.createExecutorScheduler #7306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.*;

import org.reactivestreams.Subscriber;

Expand Down Expand Up @@ -1302,6 +1302,26 @@ public static Scheduler createSingleScheduler(@NonNull ThreadFactory threadFacto
return new SingleScheduler(Objects.requireNonNull(threadFactory, "threadFactory is null"));
}

/**
* Create an instance of a {@link Scheduler} by wrapping an existing {@link Executor}.
* <p>
* This method allows creating an {@code Executor}-backed {@code Scheduler} before the {@link Schedulers} class
* would initialize the standard {@code Scheduler}s.
*
* @param executor the {@code Executor} to wrap and turn into a {@code Scheduler}.
* @param interruptibleWorker if {@code true}, the tasks submitted to the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} will
* be interrupted when the task is disposed.
* @param fair if {@code true}, tasks submitted to the {@code Scheduler} or {@code Worker} will be executed by the underlying {@code Executor} one after the other, still
* in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlying {@code Executor}.
* If {@code false}, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlying {@code Executor} thread.
* @return the new {@code Scheduler} wrapping the {@code Executor}
* @since 3.1.0
*/
@NonNull
public static Scheduler createExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
return new ExecutorScheduler(executor, interruptibleWorker, fair);
}

/**
* Wraps the call to the function in try-catch and propagates thrown
* checked exceptions as RuntimeException.
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,18 @@ public static Scheduler single() {
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@code Scheduler} instance, even for the same {@code Executor} instance.
* <p>
* It is possible to wrap an {@code Executor} into a {@code Scheduler} without triggering the initialization of all the
* standard schedulers by using the {@link RxJavaPlugins#createExecutorScheduler(Executor, boolean, boolean)} method
* before the {@code Schedulers} class itself is accessed.
* @param executor
* the executor to wrap
* @return the new {@code Scheduler} wrapping the {@code Executor}
* @see #from(Executor, boolean, boolean)
*/
@NonNull
public static Scheduler from(@NonNull Executor executor) {
return new ExecutorScheduler(executor, false, false);
return from(executor, false, false);
}

/**
Expand Down Expand Up @@ -452,6 +456,10 @@ public static Scheduler from(@NonNull Executor executor) {
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@code Scheduler} instance, even for the same {@code Executor} instance.
* <p>
* It is possible to wrap an {@code Executor} into a {@code Scheduler} without triggering the initialization of all the
* standard schedulers by using the {@link RxJavaPlugins#createExecutorScheduler(Executor, boolean, boolean)} method
* before the {@code Schedulers} class itself is accessed.
* <p>History: 2.2.6 - experimental
* @param executor
* the executor to wrap
Expand All @@ -463,7 +471,7 @@ public static Scheduler from(@NonNull Executor executor) {
*/
@NonNull
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker) {
return new ExecutorScheduler(executor, interruptibleWorker, false);
return from(executor, interruptibleWorker, false);
}

/**
Expand Down Expand Up @@ -532,6 +540,11 @@ public static Scheduler from(@NonNull Executor executor, boolean interruptibleWo
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@code Scheduler} instance, even for the same {@code Executor} instance.
* <p>
* It is possible to wrap an {@code Executor} into a {@code Scheduler} without triggering the initialization of all the
* standard schedulers by using the {@link RxJavaPlugins#createExecutorScheduler(Executor, boolean, boolean)} method
* before the {@code Schedulers} class itself is accessed.
*
* @param executor
* the executor to wrap
* @param interruptibleWorker if {@code true}, the tasks submitted to the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} will
Expand All @@ -544,7 +557,7 @@ public static Scheduler from(@NonNull Executor executor, boolean interruptibleWo
*/
@NonNull
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
return new ExecutorScheduler(executor, interruptibleWorker, fair);
return RxJavaPlugins.createExecutorScheduler(executor, interruptibleWorker, fair);
}

/**
Expand Down