diff --git a/src/main/java/rx/Scheduler.java b/src/main/java/rx/Scheduler.java
index 8437396e56..f1b8a0e28a 100644
--- a/src/main/java/rx/Scheduler.java
+++ b/src/main/java/rx/Scheduler.java
@@ -17,7 +17,10 @@
import java.util.concurrent.TimeUnit;
+import rx.annotations.Experimental;
import rx.functions.Action0;
+import rx.functions.Func1;
+import rx.internal.schedulers.SchedulerWhen;
import rx.schedulers.Schedulers;
import rx.subscriptions.MultipleAssignmentSubscription;
@@ -182,4 +185,79 @@ public long now() {
return System.currentTimeMillis();
}
+ /**
+ * Allows the use of operators for controlling the timing around when
+ * actions scheduled on workers are actually done. This makes it possible to
+ * layer additional behavior on this {@link Scheduler}. The only parameter
+ * is a function that flattens an {@link Observable} of {@link Observable}
+ * of {@link Completable}s into just one {@link Completable}. There must be
+ * a chain of operators connecting the returned value to the source
+ * {@link Observable} otherwise any work scheduled on the returned
+ * {@link Scheduler} will not be executed.
+ *
+ * When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
+ * {@link Completable}s is onNext'd to the combinator to be flattened. If
+ * the inner {@link Observable} is not immediately subscribed to an calls to
+ * {@link Worker#schedule} are buffered. Once the {@link Observable} is
+ * subscribed to actions are then onNext'd as {@link Completable}s.
+ *
+ * Finally the actions scheduled on the parent {@link Scheduler} when the
+ * inner most {@link Completable}s are subscribed to.
+ *
+ * When the {@link Worker} is unsubscribed the {@link Completable} emits an
+ * onComplete and triggers any behavior in the flattening operator. The
+ * {@link Observable} and all {@link Completable}s give to the flattening
+ * function never onError.
+ *
+ * Limit the amount concurrency two at a time without creating a new fix
+ * size thread pool:
+ *
+ *
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ * // use merge max concurrent to limit the number of concurrent
+ * // callbacks two at a time
+ * return Completable.merge(Observable.merge(workers), 2);
+ * });
+ *
+ *
+ * This is a slightly different way to limit the concurrency but it has some
+ * interesting benefits and drawbacks to the method above. It works by
+ * limited the number of concurrent {@link Worker}s rather than individual
+ * actions. Generally each {@link Observable} uses its own {@link Worker}.
+ * This means that this will essentially limit the number of concurrent
+ * subscribes. The danger comes from using operators like
+ * {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
+ * subscribing to the first {@link Observable} could deadlock the
+ * subscription to the second.
+ *
+ *
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ * // use merge max concurrent to limit the number of concurrent
+ * // Observables two at a time
+ * return Completable.merge(Observable.merge(workers, 2));
+ * });
+ *
+ *
+ * Slowing down the rate to no more than than 1 a second. This suffers from
+ * the same problem as the one above I could find an {@link Observable}
+ * operator that limits the rate without dropping the values (aka leaky
+ * bucket algorithm).
+ *
+ *
+ * Scheduler slowSched = Schedulers.computation().when(workers -> {
+ * // use concatenate to make each worker happen one at a time.
+ * return Completable.concat(workers.map(actions -> {
+ * // delay the starting of the next worker by 1 second.
+ * return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
+ * }));
+ * });
+ *
+ *
+ * @param combine
+ * @return
+ */
+ @Experimental
+ public S when(Func1>, Completable> combine) {
+ return (S) new SchedulerWhen(combine, this);
+ }
}
diff --git a/src/main/java/rx/internal/schedulers/SchedulerWhen.java b/src/main/java/rx/internal/schedulers/SchedulerWhen.java
new file mode 100644
index 0000000000..b1312a458b
--- /dev/null
+++ b/src/main/java/rx/internal/schedulers/SchedulerWhen.java
@@ -0,0 +1,302 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.internal.schedulers;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import rx.Completable;
+import rx.Completable.CompletableOnSubscribe;
+import rx.Completable.CompletableSubscriber;
+import rx.Scheduler.Worker;
+import rx.Observable;
+import rx.Observer;
+import rx.Scheduler;
+import rx.Subscription;
+import rx.annotations.Experimental;
+import rx.functions.Action0;
+import rx.functions.Func1;
+import rx.internal.operators.BufferUntilSubscriber;
+import rx.observers.SerializedObserver;
+import rx.subjects.PublishSubject;
+import rx.subscriptions.Subscriptions;
+
+/**
+ * Allows the use of operators for controlling the timing around when actions
+ * scheduled on workers are actually done. This makes it possible to layer
+ * additional behavior on this {@link Scheduler}. The only parameter is a
+ * function that flattens an {@link Observable} of {@link Observable} of
+ * {@link Completable}s into just one {@link Completable}. There must be a chain
+ * of operators connecting the returned value to the source {@link Observable}
+ * otherwise any work scheduled on the returned {@link Scheduler} will not be
+ * executed.
+ *
+ * When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
+ * {@link Completable}s is onNext'd to the combinator to be flattened. If the
+ * inner {@link Observable} is not immediately subscribed to an calls to
+ * {@link Worker#schedule} are buffered. Once the {@link Observable} is
+ * subscribed to actions are then onNext'd as {@link Completable}s.
+ *
+ * Finally the actions scheduled on the parent {@link Scheduler} when the inner
+ * most {@link Completable}s are subscribed to.
+ *
+ * When the {@link Worker} is unsubscribed the {@link Completable} emits an
+ * onComplete and triggers any behavior in the flattening operator. The
+ * {@link Observable} and all {@link Completable}s give to the flattening
+ * function never onError.
+ *
+ * Limit the amount concurrency two at a time without creating a new fix size
+ * thread pool:
+ *
+ *
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ * // use merge max concurrent to limit the number of concurrent
+ * // callbacks two at a time
+ * return Completable.merge(Observable.merge(workers), 2);
+ * });
+ *
+ *
+ * This is a slightly different way to limit the concurrency but it has some
+ * interesting benefits and drawbacks to the method above. It works by limited
+ * the number of concurrent {@link Worker}s rather than individual actions.
+ * Generally each {@link Observable} uses its own {@link Worker}. This means
+ * that this will essentially limit the number of concurrent subscribes. The
+ * danger comes from using operators like
+ * {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
+ * subscribing to the first {@link Observable} could deadlock the subscription
+ * to the second.
+ *
+ *
+ * Scheduler limitSched = Schedulers.computation().when(workers -> {
+ * // use merge max concurrent to limit the number of concurrent
+ * // Observables two at a time
+ * return Completable.merge(Observable.merge(workers, 2));
+ * });
+ *
+ *
+ * Slowing down the rate to no more than than 1 a second. This suffers from the
+ * same problem as the one above I could find an {@link Observable} operator
+ * that limits the rate without dropping the values (aka leaky bucket
+ * algorithm).
+ *
+ *
+ * Scheduler slowSched = Schedulers.computation().when(workers -> {
+ * // use concatenate to make each worker happen one at a time.
+ * return Completable.concat(workers.map(actions -> {
+ * // delay the starting of the next worker by 1 second.
+ * return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
+ * }));
+ * });
+ *
+ *
+ * @param combine
+ * @return
+ */
+@Experimental
+public class SchedulerWhen extends Scheduler implements Subscription {
+ private final Scheduler actualScheduler;
+ private final Observer> workerObserver;
+ private final Subscription subscription;
+
+ public SchedulerWhen(Func1>, Completable> combine, Scheduler actualScheduler) {
+ this.actualScheduler = actualScheduler;
+ // workers are converted into completables and put in this queue.
+ PublishSubject> workerSubject = PublishSubject.create();
+ this.workerObserver = new SerializedObserver>(workerSubject);
+ // send it to a custom combinator to pick the order and rate at which
+ // workers are processed.
+ this.subscription = combine.call(workerSubject.onBackpressureBuffer()).subscribe();
+ }
+
+ @Override
+ public void unsubscribe() {
+ subscription.unsubscribe();
+ }
+
+ @Override
+ public boolean isUnsubscribed() {
+ return subscription.isUnsubscribed();
+ }
+
+ @Override
+ public Worker createWorker() {
+ final Worker actualWorker = actualScheduler.createWorker();
+ // a queue for the actions submitted while worker is waiting to get to
+ // the subscribe to off the workerQueue.
+ BufferUntilSubscriber actionSubject = BufferUntilSubscriber.create();
+ final Observer actionObserver = new SerializedObserver(actionSubject);
+ // convert the work of scheduling all the actions into a completable
+ Observable actions = actionSubject.map(new Func1() {
+ @Override
+ public Completable call(final ScheduledAction action) {
+ return Completable.create(new CompletableOnSubscribe() {
+ @Override
+ public void call(CompletableSubscriber actionCompletable) {
+ actionCompletable.onSubscribe(action);
+ action.call(actualWorker);
+ actionCompletable.onCompleted();
+ }
+ });
+ }
+ });
+
+ // a worker that queues the action to the actionQueue subject.
+ Worker worker = new Worker() {
+ private final AtomicBoolean unsubscribed = new AtomicBoolean();
+
+ @Override
+ public void unsubscribe() {
+ // complete the actionQueue when worker is unsubscribed to make
+ // room for the next worker in the workerQueue.
+ if (unsubscribed.compareAndSet(false, true)) {
+ actualWorker.unsubscribe();
+ actionObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public boolean isUnsubscribed() {
+ return unsubscribed.get();
+ }
+
+ @Override
+ public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) {
+ // send a scheduled action to the actionQueue
+ DelayedAction delayedAction = new DelayedAction(action, delayTime, unit);
+ actionObserver.onNext(delayedAction);
+ return delayedAction;
+ }
+
+ @Override
+ public Subscription schedule(final Action0 action) {
+ // send a scheduled action to the actionQueue
+ ImmediateAction immediateAction = new ImmediateAction(action);
+ actionObserver.onNext(immediateAction);
+ return immediateAction;
+ }
+ };
+
+ // enqueue the completable that process actions put in reply subject
+ workerObserver.onNext(actions);
+
+ // return the worker that adds actions to the reply subject
+ return worker;
+ }
+
+ private static final Subscription SUBSCRIBED = new Subscription() {
+ @Override
+ public void unsubscribe() {
+ }
+
+ @Override
+ public boolean isUnsubscribed() {
+ return false;
+ }
+ };
+
+ private static final Subscription UNSUBSCRIBED = Subscriptions.unsubscribed();
+
+ @SuppressWarnings("serial")
+ private static abstract class ScheduledAction extends AtomicReference implements Subscription {
+ public ScheduledAction() {
+ super(SUBSCRIBED);
+ }
+
+ private final void call(Worker actualWorker) {
+ Subscription oldState = get();
+ // either SUBSCRIBED or UNSUBSCRIBED
+ if (oldState == UNSUBSCRIBED) {
+ // no need to schedule return
+ return;
+ }
+ if (oldState != SUBSCRIBED) {
+ // has already been scheduled return
+ // should not be able to get here but handle it anyway by not
+ // rescheduling.
+ return;
+ }
+
+ Subscription newState = callActual(actualWorker);
+
+ if (!compareAndSet(SUBSCRIBED, newState)) {
+ // set would only fail if the new current state is some other
+ // subscription from a concurrent call to this method.
+ // Unsubscribe from the action just scheduled because it lost
+ // the race.
+ newState.unsubscribe();
+ }
+ }
+
+ protected abstract Subscription callActual(Worker actualWorker);
+
+ @Override
+ public boolean isUnsubscribed() {
+ return get().isUnsubscribed();
+ }
+
+ @Override
+ public void unsubscribe() {
+ Subscription oldState;
+ // no matter what the current state is the new state is going to be
+ Subscription newState = UNSUBSCRIBED;
+ do {
+ oldState = get();
+ if (oldState == UNSUBSCRIBED) {
+ // the action has already been unsubscribed
+ return;
+ }
+ } while (!compareAndSet(oldState, newState));
+
+ if (oldState != SUBSCRIBED) {
+ // the action was scheduled. stop it.
+ oldState.unsubscribe();
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class ImmediateAction extends ScheduledAction {
+ private final Action0 action;
+
+ public ImmediateAction(Action0 action) {
+ this.action = action;
+ }
+
+ @Override
+ protected Subscription callActual(Worker actualWorker) {
+ return actualWorker.schedule(action);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static class DelayedAction extends ScheduledAction {
+ private final Action0 action;
+ private final long delayTime;
+ private final TimeUnit unit;
+
+ public DelayedAction(Action0 action, long delayTime, TimeUnit unit) {
+ this.action = action;
+ this.delayTime = delayTime;
+ this.unit = unit;
+ }
+
+ @Override
+ protected Subscription callActual(Worker actualWorker) {
+ return actualWorker.schedule(action, delayTime, unit);
+ }
+ }
+}
diff --git a/src/test/java/rx/schedulers/SchedulerWhenTest.java b/src/test/java/rx/schedulers/SchedulerWhenTest.java
new file mode 100644
index 0000000000..f911e0d10a
--- /dev/null
+++ b/src/test/java/rx/schedulers/SchedulerWhenTest.java
@@ -0,0 +1,206 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.schedulers;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.junit.Test;
+
+import rx.Completable;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func0;
+import rx.functions.Func1;
+import rx.internal.schedulers.SchedulerWhen;
+import rx.observers.TestSubscriber;
+
+public class SchedulerWhenTest {
+ @Test
+ public void testAsyncMaxConcurrent() {
+ TestScheduler tSched = new TestScheduler();
+ SchedulerWhen sched = maxConcurrentScheduler(tSched);
+ TestSubscriber tSub = TestSubscriber.create();
+
+ asyncWork(sched).subscribe(tSub);
+
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(0, SECONDS);
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(2);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(4);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(5);
+ tSub.assertCompleted();
+
+ sched.unsubscribe();
+ }
+
+ @Test
+ public void testAsyncDelaySubscription() {
+ final TestScheduler tSched = new TestScheduler();
+ SchedulerWhen sched = throttleScheduler(tSched);
+ TestSubscriber tSub = TestSubscriber.create();
+
+ asyncWork(sched).subscribe(tSub);
+
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(0, SECONDS);
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(1);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(1);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(2);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(2);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(3);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(3);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(4);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(4);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(5);
+ tSub.assertCompleted();
+
+ sched.unsubscribe();
+ }
+
+ @Test
+ public void testSyncMaxConcurrent() {
+ TestScheduler tSched = new TestScheduler();
+ SchedulerWhen sched = maxConcurrentScheduler(tSched);
+ TestSubscriber tSub = TestSubscriber.create();
+
+ syncWork(sched).subscribe(tSub);
+
+ tSub.assertValueCount(0);
+ tSched.advanceTimeBy(0, SECONDS);
+
+ // since all the work is synchronous nothing is blocked and its all done
+ tSub.assertValueCount(5);
+ tSub.assertCompleted();
+
+ sched.unsubscribe();
+ }
+
+ @Test
+ public void testSyncDelaySubscription() {
+ final TestScheduler tSched = new TestScheduler();
+ SchedulerWhen sched = throttleScheduler(tSched);
+ TestSubscriber tSub = TestSubscriber.create();
+
+ syncWork(sched).subscribe(tSub);
+
+ tSub.assertValueCount(0);
+
+ tSched.advanceTimeBy(0, SECONDS);
+ tSub.assertValueCount(1);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(2);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(3);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(4);
+
+ tSched.advanceTimeBy(1, SECONDS);
+ tSub.assertValueCount(5);
+ tSub.assertCompleted();
+
+ sched.unsubscribe();
+ }
+
+ private Observable asyncWork(final Scheduler sched) {
+ return Observable.range(1, 5).flatMap(new Func1>() {
+ @Override
+ public Observable call(Integer t) {
+ return Observable.timer(1, SECONDS, sched);
+ }
+ });
+ }
+
+ private Observable syncWork(final Scheduler sched) {
+ return Observable.range(1, 5).flatMap(new Func1>() {
+ @Override
+ public Observable call(Integer t) {
+ return Observable.defer(new Func0>() {
+ @Override
+ public Observable call() {
+ return Observable.just(0l);
+ }
+ }).subscribeOn(sched);
+ }
+ });
+ }
+
+ private SchedulerWhen maxConcurrentScheduler(TestScheduler tSched) {
+ SchedulerWhen sched = new SchedulerWhen(new Func1>, Completable>() {
+ @Override
+ public Completable call(Observable> workerActions) {
+ Observable workers = workerActions.map(new Func1, Completable>() {
+ @Override
+ public Completable call(Observable actions) {
+ return Completable.concat(actions);
+ }
+ });
+ return Completable.merge(workers, 2);
+ }
+ }, tSched);
+ return sched;
+ }
+
+ private SchedulerWhen throttleScheduler(final TestScheduler tSched) {
+ SchedulerWhen sched = new SchedulerWhen(new Func1>, Completable>() {
+ @Override
+ public Completable call(Observable> workerActions) {
+ Observable workers = workerActions.map(new Func1, Completable>() {
+ @Override
+ public Completable call(Observable actions) {
+ return Completable.concat(actions);
+ }
+ });
+ return Completable.concat(workers.map(new Func1() {
+ public Completable call(Completable worker) {
+ return worker.delay(1, SECONDS, tSched);
+ }
+ }));
+ }
+ }, tSched);
+ return sched;
+ }
+}