Skip to content

1.x: fix singleOrDefault() backpressure if source is empty #3905

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 15 additions & 25 deletions src/main/java/rx/internal/operators/OperatorSingle.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
package rx.internal.operators;

import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.internal.producers.SingleProducer;
import rx.internal.util.RxJavaPluginUtils;

/**
* If the Observable completes after emitting a single item that matches a
Expand Down Expand Up @@ -65,19 +65,6 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {

final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, hasDefaultValue,
defaultValue);

child.setProducer(new Producer() {

private final AtomicBoolean requestedTwo = new AtomicBoolean(false);

@Override
public void request(long n) {
if (n > 0 && requestedTwo.compareAndSet(false, true)) {
parent.requestMore(2);
}
}

});
child.add(parent);
return parent;
}
Expand All @@ -88,23 +75,23 @@ private static final class ParentSubscriber<T> extends Subscriber<T> {
private final T defaultValue;

private T value;
private boolean isNonEmpty = false;
private boolean hasTooManyElements = false;
private boolean isNonEmpty;
private boolean hasTooManyElements;


ParentSubscriber(Subscriber<? super T> child, boolean hasDefaultValue,
T defaultValue) {
this.child = child;
this.hasDefaultValue = hasDefaultValue;
this.defaultValue = defaultValue;
}

void requestMore(long n) {
request(n);
request(2); // could go unbounded, but test expect this
}

@Override
public void onNext(T value) {
if (hasTooManyElements) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Report error to plugin?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets here in case upstream ignored the unsubscription or is simply ignoring backpressure. There is no error here unless you want to new up and then sink some error.

} else
if (isNonEmpty) {
hasTooManyElements = true;
child.onError(new IllegalArgumentException("Sequence contains too many elements"));
Expand All @@ -121,12 +108,10 @@ public void onCompleted() {
// We have already sent an onError message
} else {
if (isNonEmpty) {
child.onNext(value);
child.onCompleted();
child.setProducer(new SingleProducer<T>(child, value));
} else {
if (hasDefaultValue) {
child.onNext(defaultValue);
child.onCompleted();
child.setProducer(new SingleProducer<T>(child, defaultValue));
} else {
child.onError(new NoSuchElementException("Sequence contains no elements"));
}
Expand All @@ -136,6 +121,11 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
if (hasTooManyElements) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Report to plugin?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I left it out for you specifically.

RxJavaPluginUtils.handleException(e);
return;
}

child.onError(e);
}

Expand Down
16 changes: 16 additions & 0 deletions src/test/java/rx/internal/operators/OperatorSingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.TestSubscriber;

public class OperatorSingleTest {

Expand Down Expand Up @@ -456,4 +457,19 @@ public Integer call(Integer i1, Integer i2) {
Integer r = reduced.toBlocking().first();
assertEquals(21, r.intValue());
}

@Test
public void defaultBackpressure() {
TestSubscriber<Integer> ts = TestSubscriber.create(0);

Observable.<Integer>empty().singleOrDefault(1).subscribe(ts);

ts.assertNoValues();

ts.requestMore(1);

ts.assertValue(1);
ts.assertCompleted();
ts.assertNoErrors();
}
}