Skip to content

Commit 90e0423

Browse files
committed
Merge pull request #3655 from artem-zinnatullin/single-onErrorResumeNextViaSingle
1.x: Add Single.onErrorResumeNext(Single)
2 parents ab018df + 67ef32c commit 90e0423

File tree

3 files changed

+115
-3
lines changed

3 files changed

+115
-3
lines changed

src/main/java/rx/Single.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,6 +1411,37 @@ public final Single<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunctio
14111411
return lift(new OperatorOnErrorReturn<T>(resumeFunction));
14121412
}
14131413

1414+
/**
1415+
* Instructs a Single to pass control to another Single rather than invoking
1416+
* {@link Observer#onError(Throwable)} if it encounters an error.
1417+
* <p/>
1418+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png" alt="">
1419+
* <p/>
1420+
* By default, when a Single encounters an error that prevents it from emitting the expected item to
1421+
* its {@link Observer}, the Single invokes its Observer's {@code onError} method, and then quits
1422+
* without invoking any more of its Observer's methods. The {@code onErrorResumeNext} method changes this
1423+
* behavior. If you pass another Single ({@code resumeSingleInCaseOfError}) to an Single's
1424+
* {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its
1425+
* Observer's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which
1426+
* will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case,
1427+
* because no Single necessarily invokes {@code onError}, the Observer may never know that an error
1428+
* happened.
1429+
* <p/>
1430+
* You can use this to prevent errors from propagating or to supply fallback data should errors be
1431+
* encountered.
1432+
* <dl>
1433+
* <dt><b>Scheduler:</b></dt>
1434+
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
1435+
* </dl>
1436+
*
1437+
* @param resumeSingleInCaseOfError a Single that will take control if source Single encounters an error.
1438+
* @return the original Single, with appropriately modified behavior.
1439+
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
1440+
*/
1441+
public final Single<T> onErrorResumeNext(Single<? extends T> resumeSingleInCaseOfError) {
1442+
return new Single<T>(new SingleOperatorOnErrorResumeNextViaSingle<T>(this, resumeSingleInCaseOfError));
1443+
}
1444+
14141445
/**
14151446
* Subscribes to a Single but ignore its emission or notification.
14161447
* <dl>
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package rx.internal.operators;
2+
3+
import rx.Single;
4+
import rx.SingleSubscriber;
5+
import rx.plugins.RxJavaPlugins;
6+
7+
public class SingleOperatorOnErrorResumeNextViaSingle<T> implements Single.OnSubscribe<T> {
8+
9+
private final Single<? extends T> originalSingle;
10+
private final Single<? extends T> resumeSingleInCaseOfError;
11+
12+
public SingleOperatorOnErrorResumeNextViaSingle(Single<? extends T> originalSingle, Single<? extends T> resumeSingleInCaseOfError) {
13+
if (originalSingle == null) {
14+
throw new NullPointerException("originalSingle must not be null");
15+
}
16+
17+
if (resumeSingleInCaseOfError == null) {
18+
throw new NullPointerException("resumeSingleInCaseOfError must not be null");
19+
}
20+
21+
this.originalSingle = originalSingle;
22+
this.resumeSingleInCaseOfError = resumeSingleInCaseOfError;
23+
}
24+
25+
@Override
26+
public void call(final SingleSubscriber<? super T> child) {
27+
final SingleSubscriber<? super T> parent = new SingleSubscriber<T>() {
28+
@Override
29+
public void onSuccess(T value) {
30+
child.onSuccess(value);
31+
}
32+
33+
@Override
34+
public void onError(Throwable error) {
35+
RxJavaPlugins.getInstance().getErrorHandler().handleError(error);
36+
unsubscribe();
37+
38+
resumeSingleInCaseOfError.subscribe(child);
39+
}
40+
};
41+
42+
child.add(parent);
43+
originalSingle.subscribe(parent);
44+
}
45+
}

src/test/java/rx/SingleTest.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
/**
22
* Copyright 2015 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
55
* compliance with the License. You may obtain a copy of the License at
6-
*
6+
*
77
* http://www.apache.org/licenses/LICENSE-2.0
8-
*
8+
*
99
* Unless required by applicable law or agreed to in writing, software distributed under the License is
1010
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
1111
* the License for the specific language governing permissions and limitations under the License.
@@ -1182,6 +1182,42 @@ public void doAfterTerminateActionShouldNotBeInvokedUntilSubscriberSubscribes()
11821182
verifyZeroInteractions(action);
11831183
}
11841184

1185+
@Test
1186+
public void onErrorResumeNextViaSingleShouldNotInterruptSuccessfulSingle() {
1187+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
1188+
1189+
Single
1190+
.just("success")
1191+
.onErrorResumeNext(Single.just("fail"))
1192+
.subscribe(testSubscriber);
1193+
1194+
testSubscriber.assertValue("success");
1195+
}
1196+
1197+
@Test
1198+
public void onErrorResumeNextViaSingleShouldResumeWithPassedSingleInCaseOfError() {
1199+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
1200+
1201+
Single
1202+
.<String>error(new RuntimeException("test exception"))
1203+
.onErrorResumeNext(Single.just("fallback"))
1204+
.subscribe(testSubscriber);
1205+
1206+
testSubscriber.assertValue("fallback");
1207+
}
1208+
1209+
@Test
1210+
public void onErrorResumeNextViaSingleShouldPreventNullSingle() {
1211+
try {
1212+
Single
1213+
.just("value")
1214+
.onErrorResumeNext(null);
1215+
fail();
1216+
} catch (NullPointerException expected) {
1217+
assertEquals("resumeSingleInCaseOfError must not be null", expected.getMessage());
1218+
}
1219+
}
1220+
11851221
@Test(expected = NullPointerException.class)
11861222
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
11871223
Single.iterableToArray(null);

0 commit comments

Comments
 (0)