Skip to content

fix Amb backpressure bug #2961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 73 additions & 29 deletions src/main/java/rx/internal/operators/OnSubscribeAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private static final class AmbSubscriber<T> extends Subscriber<T> {

private final Subscriber<? super T> subscriber;
private final Selection<T> selection;
private boolean chosen;

private AmbSubscriber(long requested, Subscriber<? super T> subscriber, Selection<T> selection) {
this.subscriber = subscriber;
Expand All @@ -282,11 +283,11 @@ private final void requestMore(long n) {
}

@Override
public void onNext(T args) {
public void onNext(T t) {
if (!isSelected()) {
return;
}
subscriber.onNext(args);
subscriber.onNext(t);
}

@Override
Expand All @@ -306,12 +307,17 @@ public void onError(Throwable e) {
}

private boolean isSelected() {
if (chosen) {
return true;
}
if (selection.choice.get() == this) {
// fast-path
chosen = true;
return true;
} else {
if (selection.choice.compareAndSet(null, this)) {
selection.unsubscribeOthers(this);
chosen = true;
return true;
} else {
// we lost so unsubscribe ... and force cleanup again due to possible race conditions
Expand Down Expand Up @@ -343,59 +349,97 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
}

}

private final Iterable<? extends Observable<? extends T>> sources;
private final Selection<T> selection = new Selection<T>();


//give default access instead of private as a micro-optimization
//for access from anonymous classes below
final Iterable<? extends Observable<? extends T>> sources;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Just learnt that accessing a private field of the outer class uses invokestatic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And generates an accessor method in that outer class, the count of which (methods) is a big deal on Android. So 👍.

final Selection<T> selection = new Selection<T>();
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;

private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
this.sources = sources;
}

@Override
public void call(final Subscriber<? super T> subscriber) {

//setup unsubscription of all the subscribers to the sources
subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
if (selection.choice.get() != null) {
AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// there is a single winner so we unsubscribe it
selection.choice.get().unsubscribe();
c.unsubscribe();
}
// if we are racing with others still existing, we'll also unsubscribe them
if(!selection.ambSubscribers.isEmpty()) {
for (AmbSubscriber<T> other : selection.ambSubscribers) {
other.unsubscribe();
}
selection.ambSubscribers.clear();
}
// if subscriptions are occurring as this is happening then this call may not
// unsubscribe everything. We protect ourselves though by doing another unsubscribe check
// after the subscription loop below
unsubscribeAmbSubscribers(selection.ambSubscribers);
}

}));

//need to subscribe to all the sources
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, selection.ambSubscribers could be pulled before the loop.

// check again if choice has been made so can stop subscribing
// if all sources were backpressure aware then this check
// would be pointless given that 0 was requested above from each ambSubscriber
AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(c);
return;
}
source.unsafeSubscribe(ambSubscriber);
}
// while subscribing unsubscription may have occurred so we clean up after
if (subscriber.isUnsubscribed()) {
unsubscribeAmbSubscribers(selection.ambSubscribers);
}

subscriber.setProducer(new Producer() {

@Override
public void request(long n) {
if (selection.choice.get() != null) {
final AmbSubscriber<T> c;
if ((c = choice.get()) != null) {
// propagate the request to that single Subscriber that won
selection.choice.get().requestMore(n);
c.requestMore(n);
} else {
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
if (selection.choice.get() != null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(selection.choice.get());
break;
//propagate the request to all the amb subscribers
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may still run concurrently with the subscription loop and some subscribers won't get the newer requests during that time. The simplest fix is to subscribe all AmbSubscriber in the call() of the operator with 0 initial requests so all subsequent request() can target all or the winner without any additional race. The other way is to get into some complicated serialization logic.

if (!ambSubscriber.isUnsubscribed()) {
// make a best endeavours check to not waste requests
// if first emission has already occurred
if (choice.get() == ambSubscriber) {
ambSubscriber.requestMore(n);
// don't need to request from other subscribers because choice has been made
// and request has gone to choice
return;
} else {
ambSubscriber.requestMore(n);
}
}
source.unsafeSubscribe(ambSubscriber);
}
}
}
});
}

private static <T> void unsubscribeAmbSubscribers(Collection<AmbSubscriber<T>> ambSubscribers) {
if(!ambSubscribers.isEmpty()) {
for (AmbSubscriber<T> other : ambSubscribers) {
other.unsubscribe();
}
ambSubscribers.clear();
}
}
}
70 changes: 70 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeAmbTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static rx.internal.operators.OnSubscribeAmb.amb;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -36,6 +37,7 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -219,4 +221,72 @@ public void testBackpressure() {
ts.assertNoErrors();
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
}


@Test
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
final AtomicLong count = new AtomicLong();
Action0 incrementer = new Action0() {
@Override
public void call() {
count.incrementAndGet();
}
};
//this aync stream should emit first
Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.amb(o1, o2).subscribe(ts);
ts.requestMore(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be forbidden in the real case. Right? request should be called in Subscriber.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So testSubscriptionOnlyHappensOnce is a test case that requesting twice before first emission. Right? The other request is in subscribe.

ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
assertEquals(2, count.get());
}

@Test
public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException {
//this aync stream should emit first
Observable<Integer> o1 = Observable.from(Arrays.asList(1, 2, 3))
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
//this stream emits second
Observable<Integer> o2 = Observable.from(Arrays.asList(4, 5, 6))
.delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onStart() {
request(1);
}};
Observable.amb(o1, o2).subscribe(ts);
// before first emission request 20 more
// this request should suffice to emit all
ts.requestMore(20);
//ensure stream does not hang
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
}

@Test
public void testSynchronousSources() {
// under async subscription the second observable would complete before
// the first but because this is a synchronous subscription to sources
// then second observable does not get subscribed to before first
// subscription completes hence first observable emits result through
// amb
int result = Observable.just(1).doOnNext(new Action1<Object>() {

@Override
public void call(Object t) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
//
}
}
}).ambWith(Observable.just(2)).toBlocking().single();
assertEquals(1, result);
}

}