Skip to content

Commit 0c44ad2

Browse files
Add Single.onErrorResumeNext(Single)
1 parent 503d369 commit 0c44ad2

File tree

3 files changed

+125
-3
lines changed

3 files changed

+125
-3
lines changed

src/main/java/rx/Single.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,6 +1411,37 @@ public final Single<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunctio
14111411
return lift(new OperatorOnErrorReturn<T>(resumeFunction));
14121412
}
14131413

1414+
/**
1415+
* Instructs a Single to pass control to another Single rather than invoking
1416+
* {@link Observer#onError(Throwable)} if it encounters an error.
1417+
* <p/>
1418+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png" alt="">
1419+
* <p/>
1420+
* By default, when a Single encounters an error that prevents it from emitting the expected item to
1421+
* its {@link Observer}, the Single invokes its Observer's {@code onError} method, and then quits
1422+
* without invoking any more of its Observer's methods. The {@code onErrorResumeNext} method changes this
1423+
* behavior. If you pass another Single ({@code resumeSingle}) to an Single's
1424+
* {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its
1425+
* Observer's {@code onError} method, it will instead relinquish control to {@code resumeSingle} which
1426+
* will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case,
1427+
* because no Single necessarily invokes {@code onError}, the Observer may never know that an error
1428+
* happened.
1429+
* <p/>
1430+
* You can use this to prevent errors from propagating or to supply fallback data should errors be
1431+
* encountered.
1432+
* <dl>
1433+
* <dt><b>Scheduler:</b></dt>
1434+
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
1435+
* </dl>
1436+
*
1437+
* @param resumeSingle a Single that will take control if source Single encounters an error.
1438+
* @return the original Single, with appropriately modified behavior.
1439+
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
1440+
*/
1441+
public final Single<T> onErrorResumeNext(Single<? extends T> resumeSingle) {
1442+
return lift(new SingleOperatorOnErrorResumeNextViaSingle<T>(resumeSingle));
1443+
}
1444+
14141445
/**
14151446
* Subscribes to a Single but ignore its emission or notification.
14161447
* <dl>
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package rx.internal.operators;
2+
3+
import rx.Observable.Operator;
4+
import rx.Producer;
5+
import rx.Single;
6+
import rx.Subscriber;
7+
import rx.exceptions.Exceptions;
8+
import rx.plugins.RxJavaPlugins;
9+
10+
public class SingleOperatorOnErrorResumeNextViaSingle<T> implements Operator<T, T> {
11+
12+
final Single<? extends T> resumeSingle;
13+
14+
public SingleOperatorOnErrorResumeNextViaSingle(Single<? extends T> resumeSingle) {
15+
this.resumeSingle = resumeSingle;
16+
}
17+
18+
@Override
19+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
20+
final Subscriber<T> operatorSubscriber = new Subscriber<T>() {
21+
22+
private boolean done;
23+
24+
@Override
25+
public void onError(Throwable e) {
26+
if (done) {
27+
Exceptions.throwIfFatal(e);
28+
} else {
29+
done = true;
30+
31+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
32+
unsubscribe();
33+
34+
resumeSingle.unsafeSubscribe(child);
35+
}
36+
}
37+
38+
@Override
39+
public void onNext(T t) {
40+
if (!done) {
41+
child.onNext(t);
42+
}
43+
}
44+
45+
@Override
46+
public void onCompleted() {
47+
if (!done) {
48+
done = true;
49+
child.onCompleted();
50+
}
51+
}
52+
53+
@Override
54+
public void setProducer(final Producer producer) {
55+
child.setProducer(new Producer() {
56+
@Override
57+
public void request(long n) {
58+
producer.request(n);
59+
}
60+
});
61+
}
62+
};
63+
64+
child.add(operatorSubscriber);
65+
return operatorSubscriber;
66+
}
67+
}

src/test/java/rx/SingleTest.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
/**
22
* Copyright 2015 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
55
* compliance with the License. You may obtain a copy of the License at
6-
*
6+
*
77
* http://www.apache.org/licenses/LICENSE-2.0
8-
*
8+
*
99
* Unless required by applicable law or agreed to in writing, software distributed under the License is
1010
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
1111
* the License for the specific language governing permissions and limitations under the License.
@@ -1182,6 +1182,30 @@ public void doAfterTerminateActionShouldNotBeInvokedUntilSubscriberSubscribes()
11821182
verifyZeroInteractions(action);
11831183
}
11841184

1185+
@Test
1186+
public void onErrorResumeNextViaSingleShouldNotInterruptSuccessfulSingle() {
1187+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
1188+
1189+
Single
1190+
.just("success")
1191+
.onErrorResumeNext(Single.just("fail"))
1192+
.subscribe(testSubscriber);
1193+
1194+
testSubscriber.assertValue("success");
1195+
}
1196+
1197+
@Test
1198+
public void onErrorResumeNextViaSingleShouldResumeWithPassedSingleInCaseOfError() {
1199+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
1200+
1201+
Single
1202+
.<String>error(new RuntimeException("test exception"))
1203+
.onErrorResumeNext(Single.just("fallback"))
1204+
.subscribe(testSubscriber);
1205+
1206+
testSubscriber.assertValue("fallback");
1207+
}
1208+
11851209
@Test(expected = NullPointerException.class)
11861210
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
11871211
Single.iterableToArray(null);

0 commit comments

Comments
 (0)