diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 89e1aea07c..bdc59e33ce 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -10333,11 +10333,15 @@ static Subscription subscribe(Subscriber subscriber, Observable + * If there is a {@link #create(Action1, rx.Emitter.BackpressureMode)} type source up in the + * chain, it is recommended to use {@code subscribeOn(scheduler, false)} instead + * to avoid same-pool deadlock because requests pile up behind a eager/blocking emitter. + *

* *

*
Backpressure:
- *
The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure - * behavior.
+ *
The operator doesn't interfere with backpressure amount which is determined by the source {@code Observable}'s backpressure + * behavior. However, the upstream is requested from the given scheduler thread.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -10349,12 +10353,48 @@ static Subscription subscribe(Subscriber subscriber, ObservableReactiveX operators documentation: SubscribeOn * @see RxJava Threading Examples * @see #observeOn + * @see #subscribeOn(Scheduler, boolean) */ public final Observable subscribeOn(Scheduler scheduler) { + return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate)); + } + + /** + * Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler} and + * optionally reroutes requests from other threads to the same {@link Scheduler} thread. + *

+ * If there is a {@link #create(Action1, rx.Emitter.BackpressureMode)} type source up in the + * chain, it is recommended to have {@code requestOn} false to avoid same-pool deadlock + * because requests pile up behind a eager/blocking emitter. + *

+ * + *

+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure amount which is determined by the source {@code Observable}'s backpressure + * behavior. However, the upstream is requested from the given scheduler if requestOn is true.
+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler + * the {@link Scheduler} to perform subscription actions on + * @param requestOn if true, requests are rerouted to the given Scheduler as well (strong pipelining) + * if false, requests coming from any thread are simply forwarded to + * the upstream on the same thread (weak pipelining) + * @return the source Observable modified so that its subscriptions happen on the + * specified {@link Scheduler} + * @see ReactiveX operators documentation: SubscribeOn + * @see RxJava Threading Examples + * @see #observeOn + * @see #subscribeOn(Scheduler) + * @since 1.2.7 - experimental + */ + @Experimental + public final Observable subscribeOn(Scheduler scheduler, boolean requestOn) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler); } - return unsafeCreate(new OperatorSubscribeOn(this, scheduler)); + return unsafeCreate(new OperatorSubscribeOn(this, scheduler, requestOn)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorSubscribeOn.java b/src/main/java/rx/internal/operators/OperatorSubscribeOn.java index 9b90cbc26a..0be1b01829 100644 --- a/src/main/java/rx/internal/operators/OperatorSubscribeOn.java +++ b/src/main/java/rx/internal/operators/OperatorSubscribeOn.java @@ -31,68 +31,92 @@ public final class OperatorSubscribeOn implements OnSubscribe { final Scheduler scheduler; final Observable source; + final boolean requestOn; - public OperatorSubscribeOn(Observable source, Scheduler scheduler) { + public OperatorSubscribeOn(Observable source, Scheduler scheduler, boolean requestOn) { this.scheduler = scheduler; this.source = source; + this.requestOn = requestOn; } @Override public void call(final Subscriber subscriber) { final Worker inner = scheduler.createWorker(); + + SubscribeOnSubscriber parent = new SubscribeOnSubscriber(subscriber, requestOn, inner, source); + subscriber.add(parent); subscriber.add(inner); - inner.schedule(new Action0() { - @Override - public void call() { - final Thread t = Thread.currentThread(); + inner.schedule(parent); + } - Subscriber s = new Subscriber(subscriber) { - @Override - public void onNext(T t) { - subscriber.onNext(t); - } + static final class SubscribeOnSubscriber extends Subscriber implements Action0 { - @Override - public void onError(Throwable e) { - try { - subscriber.onError(e); - } finally { - inner.unsubscribe(); - } - } + final Subscriber actual; - @Override - public void onCompleted() { - try { - subscriber.onCompleted(); - } finally { - inner.unsubscribe(); - } - } + final boolean requestOn; + + final Worker worker; + + Observable source; + + Thread t; + + SubscribeOnSubscriber(Subscriber actual, boolean requestOn, Worker worker, Observable source) { + this.actual = actual; + this.requestOn = requestOn; + this.worker = worker; + this.source = source; + } - @Override - public void setProducer(final Producer p) { - subscriber.setProducer(new Producer() { + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public void onError(Throwable e) { + try { + actual.onError(e); + } finally { + worker.unsubscribe(); + } + } + + @Override + public void onCompleted() { + try { + actual.onCompleted(); + } finally { + worker.unsubscribe(); + } + } + + @Override + public void call() { + Observable src = source; + source = null; + t = Thread.currentThread(); + src.unsafeSubscribe(this); + } + + @Override + public void setProducer(final Producer p) { + actual.setProducer(new Producer() { + @Override + public void request(final long n) { + if (t == Thread.currentThread() || !requestOn) { + p.request(n); + } else { + worker.schedule(new Action0() { @Override - public void request(final long n) { - if (t == Thread.currentThread()) { - p.request(n); - } else { - inner.schedule(new Action0() { - @Override - public void call() { - p.request(n); - } - }); - } + public void call() { + p.request(n); } }); } - }; - - source.unsafeSubscribe(s); - } - }); + } + }); + } } } \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorGroupByTest.java b/src/test/java/rx/internal/operators/OperatorGroupByTest.java index 5d035bcd82..7fd6ab0a2e 100644 --- a/src/test/java/rx/internal/operators/OperatorGroupByTest.java +++ b/src/test/java/rx/internal/operators/OperatorGroupByTest.java @@ -2017,11 +2017,11 @@ public Map call(Action1 t) { throw exception; }}; } - + @Test public void outerConsumedInABoundedManner() { final int[] counter = { 0 }; - + Observable.range(1, 10000) .doOnRequest(new Action1() { @Override diff --git a/src/test/java/rx/internal/operators/OperatorSubscribeOnTest.java b/src/test/java/rx/internal/operators/OperatorSubscribeOnTest.java index 6eb4abd8e8..b28b3fc27b 100644 --- a/src/test/java/rx/internal/operators/OperatorSubscribeOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorSubscribeOnTest.java @@ -26,16 +26,12 @@ import org.junit.Test; -import rx.Observable; +import rx.*; import rx.Observable.OnSubscribe; import rx.Observable.Operator; -import rx.Observer; -import rx.Producer; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.observers.TestSubscriber; +import rx.functions.*; +import rx.internal.util.*; +import rx.observers.*; import rx.schedulers.Schedulers; public class OperatorSubscribeOnTest { @@ -267,4 +263,85 @@ public void onNext(Integer t) { ts.assertNoErrors(); } + @Test + public void noSamepoolDeadlock() { + final int n = 4 * RxRingBuffer.SIZE; + + Observable.create(new Action1>() { + @Override + public void call(Emitter e) { + for (int i = 0; i < n; i++) { + e.onNext(i); + try { + Thread.sleep(1); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + e.onCompleted(); + } + }, Emitter.BackpressureMode.DROP) + .map(UtilityFunctions.identity()) + .subscribeOn(Schedulers.io(), false) + .observeOn(Schedulers.computation()) + .test() + .awaitTerminalEvent(5, TimeUnit.SECONDS) + .assertValueCount(n) + .assertNoErrors() + .assertCompleted(); + } + + @Test + public void noSamepoolDeadlockRequestOn() { + final int n = 4 * RxRingBuffer.SIZE; + + Observable.create(new Action1>() { + @Override + public void call(Emitter e) { + for (int i = 0; i < n; i++) { + e.onNext(i); + try { + Thread.sleep(1); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + e.onCompleted(); + } + }, Emitter.BackpressureMode.DROP) + .subscribeOn(Schedulers.io()) + .observeOn(Schedulers.computation()) + .test() + .awaitTerminalEvent(5, TimeUnit.SECONDS) + .assertValueCount(n) + .assertNoErrors() + .assertCompleted(); + } + + @Test + public void noSamepoolDeadlockRequestOn2() { + final int n = 4 * RxRingBuffer.SIZE; + + Observable.create(new Action1>() { + @Override + public void call(Emitter e) { + for (int i = 0; i < n; i++) { + e.onNext(i); + try { + Thread.sleep(1); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + e.onCompleted(); + } + }, Emitter.BackpressureMode.DROP) + .subscribeOn(Schedulers.io(), true) + .observeOn(Schedulers.computation()) + .test() + .awaitTerminalEvent(5, TimeUnit.SECONDS) + .assertValueCount(RxRingBuffer.SIZE) + .assertNoErrors() + .assertCompleted(); + } }