Skip to content

Commit 30afb3b

Browse files
authored
2.x: Flowable.onErrorResumeNext improvements (#6121)
1 parent c0f17ce commit 30afb3b

File tree

2 files changed

+33
-20
lines changed

2 files changed

+33
-20
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.reactivex.*;
1919
import io.reactivex.exceptions.*;
2020
import io.reactivex.functions.Function;
21+
import io.reactivex.internal.functions.ObjectHelper;
2122
import io.reactivex.internal.subscriptions.SubscriptionArbiter;
2223
import io.reactivex.plugins.RxJavaPlugins;
2324

@@ -35,41 +36,47 @@ public FlowableOnErrorNext(Flowable<T> source,
3536
@Override
3637
protected void subscribeActual(Subscriber<? super T> s) {
3738
OnErrorNextSubscriber<T> parent = new OnErrorNextSubscriber<T>(s, nextSupplier, allowFatal);
38-
s.onSubscribe(parent.arbiter);
39+
s.onSubscribe(parent);
3940
source.subscribe(parent);
4041
}
4142

42-
static final class OnErrorNextSubscriber<T> implements FlowableSubscriber<T> {
43+
static final class OnErrorNextSubscriber<T>
44+
extends SubscriptionArbiter
45+
implements FlowableSubscriber<T> {
46+
private static final long serialVersionUID = 4063763155303814625L;
47+
4348
final Subscriber<? super T> actual;
49+
4450
final Function<? super Throwable, ? extends Publisher<? extends T>> nextSupplier;
51+
4552
final boolean allowFatal;
46-
final SubscriptionArbiter arbiter;
4753

4854
boolean once;
4955

5056
boolean done;
5157

58+
long produced;
59+
5260
OnErrorNextSubscriber(Subscriber<? super T> actual, Function<? super Throwable, ? extends Publisher<? extends T>> nextSupplier, boolean allowFatal) {
5361
this.actual = actual;
5462
this.nextSupplier = nextSupplier;
5563
this.allowFatal = allowFatal;
56-
this.arbiter = new SubscriptionArbiter();
5764
}
5865

5966
@Override
6067
public void onSubscribe(Subscription s) {
61-
arbiter.setSubscription(s);
68+
setSubscription(s);
6269
}
6370

6471
@Override
6572
public void onNext(T t) {
6673
if (done) {
6774
return;
6875
}
69-
actual.onNext(t);
7076
if (!once) {
71-
arbiter.produced(1L);
77+
produced++;
7278
}
79+
actual.onNext(t);
7380
}
7481

7582
@Override
@@ -92,18 +99,16 @@ public void onError(Throwable t) {
9299
Publisher<? extends T> p;
93100

94101
try {
95-
p = nextSupplier.apply(t);
102+
p = ObjectHelper.requireNonNull(nextSupplier.apply(t), "The nextSupplier returned a null Publisher");
96103
} catch (Throwable e) {
97104
Exceptions.throwIfFatal(e);
98105
actual.onError(new CompositeException(t, e));
99106
return;
100107
}
101108

102-
if (p == null) {
103-
NullPointerException npe = new NullPointerException("Publisher is null");
104-
npe.initCause(t);
105-
actual.onError(npe);
106-
return;
109+
long mainProduced = produced;
110+
if (mainProduced != 0L) {
111+
produced(mainProduced);
107112
}
108113

109114
p.subscribe(this);

src/test/java/io/reactivex/flowable/FlowableNullTests.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,14 +1648,22 @@ public void onErrorResumeNextFunctionNull() {
16481648
just1.onErrorResumeNext((Function<Throwable, Publisher<Integer>>)null);
16491649
}
16501650

1651-
@Test(expected = NullPointerException.class)
1651+
@Test
16521652
public void onErrorResumeNextFunctionReturnsNull() {
1653-
Flowable.error(new TestException()).onErrorResumeNext(new Function<Throwable, Publisher<Object>>() {
1654-
@Override
1655-
public Publisher<Object> apply(Throwable e) {
1656-
return null;
1657-
}
1658-
}).blockingSubscribe();
1653+
try {
1654+
Flowable.error(new TestException()).onErrorResumeNext(new Function<Throwable, Publisher<Object>>() {
1655+
@Override
1656+
public Publisher<Object> apply(Throwable e) {
1657+
return null;
1658+
}
1659+
}).blockingSubscribe();
1660+
fail("Should have thrown");
1661+
} catch (CompositeException ex) {
1662+
List<Throwable> errors = ex.getExceptions();
1663+
TestHelper.assertError(errors, 0, TestException.class);
1664+
TestHelper.assertError(errors, 1, NullPointerException.class);
1665+
assertEquals(2, errors.size());
1666+
}
16591667
}
16601668

16611669
@Test(expected = NullPointerException.class)

0 commit comments

Comments
 (0)