Skip to content

Commit ed1dcb3

Browse files
committed
Porting the Scheduler.when operator from 1.x to 2.x
1 parent ba6f392 commit ed1dcb3

File tree

3 files changed

+619
-0
lines changed

3 files changed

+619
-0
lines changed

src/main/java/io/reactivex/Scheduler.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515

1616
import java.util.concurrent.TimeUnit;
1717

18+
import io.reactivex.annotations.Experimental;
1819
import io.reactivex.disposables.Disposable;
1920
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.Function;
2022
import io.reactivex.internal.disposables.*;
23+
import io.reactivex.internal.schedulers.SchedulerWhen;
2124
import io.reactivex.internal.util.ExceptionHelper;
2225
import io.reactivex.plugins.RxJavaPlugins;
2326

@@ -171,6 +174,85 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
171174
return periodicTask;
172175
}
173176

177+
/**
178+
* Allows the use of operators for controlling the timing around when
179+
* actions scheduled on workers are actually done. This makes it possible to
180+
* layer additional behavior on this {@link Scheduler}. The only parameter
181+
* is a function that flattens an {@link Flowable} of {@link Flowable}
182+
* of {@link Completable}s into just one {@link Completable}. There must be
183+
* a chain of operators connecting the returned value to the source
184+
* {@link Flowable} otherwise any work scheduled on the returned
185+
* {@link Scheduler} will not be executed.
186+
* <p>
187+
* When {@link Scheduler#createWorker()} is invoked a {@link Flowable} of
188+
* {@link Completable}s is onNext'd to the combinator to be flattened. If
189+
* the inner {@link Flowable} is not immediately subscribed to an calls to
190+
* {@link Worker#schedule} are buffered. Once the {@link Flowable} is
191+
* subscribed to actions are then onNext'd as {@link Completable}s.
192+
* <p>
193+
* Finally the actions scheduled on the parent {@link Scheduler} when the
194+
* inner most {@link Completable}s are subscribed to.
195+
* <p>
196+
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
197+
* onComplete and triggers any behavior in the flattening operator. The
198+
* {@link Flowable} and all {@link Completable}s give to the flattening
199+
* function never onError.
200+
* <p>
201+
* Limit the amount concurrency two at a time without creating a new fix
202+
* size thread pool:
203+
*
204+
* <pre>
205+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
206+
* // use merge max concurrent to limit the number of concurrent
207+
* // callbacks two at a time
208+
* return Completable.merge(Flowable.merge(workers), 2);
209+
* });
210+
* </pre>
211+
* <p>
212+
* This is a slightly different way to limit the concurrency but it has some
213+
* interesting benefits and drawbacks to the method above. It works by
214+
* limited the number of concurrent {@link Worker}s rather than individual
215+
* actions. Generally each {@link Flowable} uses its own {@link Worker}.
216+
* This means that this will essentially limit the number of concurrent
217+
* subscribes. The danger comes from using operators like
218+
* {@link Flowable#zip(Flowable, Flowable, rx.functions.Func2)} where
219+
* subscribing to the first {@link Flowable} could deadlock the
220+
* subscription to the second.
221+
*
222+
* <pre>
223+
* Scheduler limitSched = Schedulers.computation().when(workers -> {
224+
* // use merge max concurrent to limit the number of concurrent
225+
* // Flowables two at a time
226+
* return Completable.merge(Flowable.merge(workers, 2));
227+
* });
228+
* </pre>
229+
*
230+
* Slowing down the rate to no more than than 1 a second. This suffers from
231+
* the same problem as the one above I could find an {@link Flowable}
232+
* operator that limits the rate without dropping the values (aka leaky
233+
* bucket algorithm).
234+
*
235+
* <pre>
236+
* Scheduler slowSched = Schedulers.computation().when(workers -> {
237+
* // use concatenate to make each worker happen one at a time.
238+
* return Completable.concat(workers.map(actions -> {
239+
* // delay the starting of the next worker by 1 second.
240+
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
241+
* }));
242+
* });
243+
* </pre>
244+
*
245+
* @param <S> a Scheduler and a Subscription
246+
* @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
247+
* the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
248+
* @return the Scheduler with the customized execution behavior
249+
*/
250+
@SuppressWarnings("unchecked")
251+
@Experimental
252+
public <S extends Scheduler & Disposable> S when(Function<Flowable<Flowable<Completable>>, Completable> combine) {
253+
return (S) new SchedulerWhen(combine, this);
254+
}
255+
174256
/**
175257
* Sequential Scheduler for executing actions on a single thread or event loop.
176258
* <p>

0 commit comments

Comments
 (0)