Skip to content

Commit 04ef4e6

Browse files
akarnokdzsxwing
authored andcommitted
1.x: fix singleOrDefault() backpressure if source is empty (#3905)
Issue #3892 is a goldmine for missing backpressure problems. This PR fixes the case when singleOrDefault encounters an empty source and has to emit some default value. Fixed via setting the SingleProducer on the child on termination.
1 parent 3721666 commit 04ef4e6

File tree

2 files changed

+31
-25
lines changed

2 files changed

+31
-25
lines changed

src/main/java/rx/internal/operators/OperatorSingle.java

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
package rx.internal.operators;
1717

1818
import java.util.NoSuchElementException;
19-
import java.util.concurrent.atomic.AtomicBoolean;
2019

2120
import rx.Observable.Operator;
22-
import rx.Producer;
2321
import rx.Subscriber;
22+
import rx.internal.producers.SingleProducer;
23+
import rx.internal.util.RxJavaPluginUtils;
2424

2525
/**
2626
* If the Observable completes after emitting a single item that matches a
@@ -65,19 +65,6 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
6565

6666
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, hasDefaultValue,
6767
defaultValue);
68-
69-
child.setProducer(new Producer() {
70-
71-
private final AtomicBoolean requestedTwo = new AtomicBoolean(false);
72-
73-
@Override
74-
public void request(long n) {
75-
if (n > 0 && requestedTwo.compareAndSet(false, true)) {
76-
parent.requestMore(2);
77-
}
78-
}
79-
80-
});
8168
child.add(parent);
8269
return parent;
8370
}
@@ -88,23 +75,23 @@ private static final class ParentSubscriber<T> extends Subscriber<T> {
8875
private final T defaultValue;
8976

9077
private T value;
91-
private boolean isNonEmpty = false;
92-
private boolean hasTooManyElements = false;
78+
private boolean isNonEmpty;
79+
private boolean hasTooManyElements;
9380

9481

9582
ParentSubscriber(Subscriber<? super T> child, boolean hasDefaultValue,
9683
T defaultValue) {
9784
this.child = child;
9885
this.hasDefaultValue = hasDefaultValue;
9986
this.defaultValue = defaultValue;
100-
}
101-
102-
void requestMore(long n) {
103-
request(n);
87+
request(2); // could go unbounded, but test expect this
10488
}
10589

10690
@Override
10791
public void onNext(T value) {
92+
if (hasTooManyElements) {
93+
return;
94+
} else
10895
if (isNonEmpty) {
10996
hasTooManyElements = true;
11097
child.onError(new IllegalArgumentException("Sequence contains too many elements"));
@@ -121,12 +108,10 @@ public void onCompleted() {
121108
// We have already sent an onError message
122109
} else {
123110
if (isNonEmpty) {
124-
child.onNext(value);
125-
child.onCompleted();
111+
child.setProducer(new SingleProducer<T>(child, value));
126112
} else {
127113
if (hasDefaultValue) {
128-
child.onNext(defaultValue);
129-
child.onCompleted();
114+
child.setProducer(new SingleProducer<T>(child, defaultValue));
130115
} else {
131116
child.onError(new NoSuchElementException("Sequence contains no elements"));
132117
}
@@ -136,6 +121,11 @@ public void onCompleted() {
136121

137122
@Override
138123
public void onError(Throwable e) {
124+
if (hasTooManyElements) {
125+
RxJavaPluginUtils.handleException(e);
126+
return;
127+
}
128+
139129
child.onError(e);
140130
}
141131

src/test/java/rx/internal/operators/OperatorSingleTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import rx.functions.Action1;
3838
import rx.functions.Func1;
3939
import rx.functions.Func2;
40+
import rx.observers.TestSubscriber;
4041

4142
public class OperatorSingleTest {
4243

@@ -456,4 +457,19 @@ public Integer call(Integer i1, Integer i2) {
456457
Integer r = reduced.toBlocking().first();
457458
assertEquals(21, r.intValue());
458459
}
460+
461+
@Test
462+
public void defaultBackpressure() {
463+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
464+
465+
Observable.<Integer>empty().singleOrDefault(1).subscribe(ts);
466+
467+
ts.assertNoValues();
468+
469+
ts.requestMore(1);
470+
471+
ts.assertValue(1);
472+
ts.assertCompleted();
473+
ts.assertNoErrors();
474+
}
459475
}

0 commit comments

Comments
 (0)