Skip to content

1.x: create+subscribeOn avoid same-pool deadlock #5091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10333,11 +10333,15 @@ static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T
/**
* Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}.
* <p>
* If there is a {@link #create(Action1, rx.Emitter.BackpressureMode)} type source up in the
* chain, it is recommended to use {@code subscribeOn(scheduler, false)} instead
* to avoid same-pool deadlock because requests pile up behind a eager/blocking emitter.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
* behavior.</dd>
* <dd>The operator doesn't interfere with backpressure amount which is determined by the source {@code Observable}'s backpressure
* behavior. However, the upstream is requested from the given scheduler thread.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
Expand All @@ -10349,12 +10353,48 @@ static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
* @see #subscribeOn(Scheduler, boolean)
*/
public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}

/**
* Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler} and
* optionally reroutes requests from other threads to the same {@link Scheduler} thread.
* <p>
* If there is a {@link #create(Action1, rx.Emitter.BackpressureMode)} type source up in the
* chain, it is recommended to have {@code requestOn} false to avoid same-pool deadlock
* because requests pile up behind a eager/blocking emitter.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure amount which is determined by the source {@code Observable}'s backpressure
* behavior. However, the upstream is requested from the given scheduler if requestOn is true.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @param requestOn if true, requests are rerouted to the given Scheduler as well (strong pipelining)
* if false, requests coming from any thread are simply forwarded to
* the upstream on the same thread (weak pipelining)
* @return the source Observable modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
* @see #subscribeOn(Scheduler)
* @since 1.2.7 - experimental
*/
@Experimental
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler));
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

/**
Expand Down
114 changes: 69 additions & 45 deletions src/main/java/rx/internal/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,68 +31,92 @@ public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();

SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);

inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
inner.schedule(parent);
}

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
final Subscriber<? super T> actual;

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
final boolean requestOn;

final Worker worker;

Observable<T> source;

Thread t;

SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
this.actual = actual;
this.requestOn = requestOn;
this.worker = worker;
this.source = source;
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}

@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
src.unsafeSubscribe(this);
}

@Override
public void setProducer(final Producer p) {
actual.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread() || !requestOn) {
p.request(n);
} else {
worker.schedule(new Action0() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
public void call() {
p.request(n);
}
});
}
};

source.unsafeSubscribe(s);
}
});
}
});
}
}
}
4 changes: 2 additions & 2 deletions src/test/java/rx/internal/operators/OperatorGroupByTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2017,11 +2017,11 @@ public Map<Integer, Object> call(Action1<Integer> t) {
throw exception;
}};
}

@Test
public void outerConsumedInABoundedManner() {
final int[] counter = { 0 };

Observable.range(1, 10000)
.doOnRequest(new Action1<Long>() {
@Override
Expand Down
93 changes: 85 additions & 8 deletions src/test/java/rx/internal/operators/OperatorSubscribeOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@

import org.junit.Test;

import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Observer;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.observers.TestSubscriber;
import rx.functions.*;
import rx.internal.util.*;
import rx.observers.*;
import rx.schedulers.Schedulers;

public class OperatorSubscribeOnTest {
Expand Down Expand Up @@ -267,4 +263,85 @@ public void onNext(Integer t) {
ts.assertNoErrors();
}

@Test
public void noSamepoolDeadlock() {
final int n = 4 * RxRingBuffer.SIZE;

Observable.create(new Action1<Emitter<Object>>() {
@Override
public void call(Emitter<Object> e) {
for (int i = 0; i < n; i++) {
e.onNext(i);
try {
Thread.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e.onCompleted();
}
}, Emitter.BackpressureMode.DROP)
.map(UtilityFunctions.identity())
.subscribeOn(Schedulers.io(), false)
.observeOn(Schedulers.computation())
.test()
.awaitTerminalEvent(5, TimeUnit.SECONDS)
.assertValueCount(n)
.assertNoErrors()
.assertCompleted();
}

@Test
public void noSamepoolDeadlockRequestOn() {
final int n = 4 * RxRingBuffer.SIZE;

Observable.create(new Action1<Emitter<Object>>() {
@Override
public void call(Emitter<Object> e) {
for (int i = 0; i < n; i++) {
e.onNext(i);
try {
Thread.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e.onCompleted();
}
}, Emitter.BackpressureMode.DROP)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.test()
.awaitTerminalEvent(5, TimeUnit.SECONDS)
.assertValueCount(n)
.assertNoErrors()
.assertCompleted();
}

@Test
public void noSamepoolDeadlockRequestOn2() {
final int n = 4 * RxRingBuffer.SIZE;

Observable.create(new Action1<Emitter<Object>>() {
@Override
public void call(Emitter<Object> e) {
for (int i = 0; i < n; i++) {
e.onNext(i);
try {
Thread.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e.onCompleted();
}
}, Emitter.BackpressureMode.DROP)
.subscribeOn(Schedulers.io(), true)
.observeOn(Schedulers.computation())
.test()
.awaitTerminalEvent(5, TimeUnit.SECONDS)
.assertValueCount(RxRingBuffer.SIZE)
.assertNoErrors()
.assertCompleted();
}
}