Skip to content

Commit 63434c6

Browse files
committed
Merge pull request #3686 from klemzy/1.x
Added retry and retryWhen support for Single
2 parents 1f98875 + 801c707 commit 63434c6

File tree

2 files changed

+271
-82
lines changed

2 files changed

+271
-82
lines changed

src/main/java/rx/Single.java

Lines changed: 106 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,39 +12,26 @@
1212
*/
1313
package rx;
1414

15-
import java.util.Collection;
16-
import java.util.concurrent.Callable;
17-
import java.util.concurrent.Future;
18-
import java.util.concurrent.TimeUnit;
19-
import java.util.concurrent.TimeoutException;
20-
2115
import rx.Observable.Operator;
16+
import rx.annotations.Beta;
2217
import rx.annotations.Experimental;
2318
import rx.exceptions.Exceptions;
2419
import rx.exceptions.OnErrorNotImplementedException;
25-
import rx.functions.Action0;
26-
import rx.functions.Action1;
27-
import rx.functions.Func1;
28-
import rx.functions.Func2;
29-
import rx.functions.Func3;
30-
import rx.functions.Func4;
31-
import rx.functions.Func5;
32-
import rx.functions.Func6;
33-
import rx.functions.Func7;
34-
import rx.functions.Func8;
35-
import rx.functions.Func9;
36-
import rx.functions.FuncN;
37-
import rx.annotations.Beta;
20+
import rx.functions.*;
3821
import rx.internal.operators.*;
3922
import rx.internal.producers.SingleDelayedProducer;
4023
import rx.internal.util.ScalarSynchronousSingle;
4124
import rx.internal.util.UtilityFunctions;
42-
import rx.singles.BlockingSingle;
4325
import rx.observers.SafeSubscriber;
44-
import rx.plugins.*;
26+
import rx.plugins.RxJavaObservableExecutionHook;
27+
import rx.plugins.RxJavaPlugins;
4528
import rx.schedulers.Schedulers;
29+
import rx.singles.BlockingSingle;
4630
import rx.subscriptions.Subscriptions;
4731

32+
import java.util.Collection;
33+
import java.util.concurrent.*;
34+
4835
/**
4936
* The Single class implements the Reactive Pattern for a single value response. See {@link Observable} for the
5037
* implementation of the Reactive Pattern for a stream or vector of values.
@@ -1820,7 +1807,7 @@ public void onError(Throwable error) {
18201807
* @return an {@link Observable} that emits a single item T.
18211808
*/
18221809
public final Observable<T> toObservable() {
1823-
return asObservable(this);
1810+
return asObservable(this);
18241811
}
18251812

18261813
/**
@@ -2209,4 +2196,101 @@ static <T> Single<? extends T>[] iterableToArray(final Iterable<? extends Single
22092196

22102197
return singlesArray;
22112198
}
2199+
2200+
/**
2201+
* Returns a Single that mirrors the source Single, resubscribing to it if it calls {@code onError}
2202+
* (infinite retry count).
2203+
*
2204+
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
2205+
*
2206+
* If the source Single calls {@link SingleSubscriber#onError}, this method will resubscribe to the source
2207+
* Single rather than propagating the {@code onError} call.
2208+
*
2209+
* <dl>
2210+
* <dt><b>Scheduler:</b></dt>
2211+
* <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
2212+
* </dl>
2213+
*
2214+
* @return the source Single modified with retry logic
2215+
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
2216+
*/
2217+
public final Single<T> retry() {
2218+
return toObservable().retry().toSingle();
2219+
}
2220+
2221+
/**
2222+
* Returns an Single that mirrors the source Single, resubscribing to it if it calls {@code onError}
2223+
* up to a specified number of retries.
2224+
*
2225+
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
2226+
*
2227+
* If the source Single calls {@link SingleSubscriber#onError}, this method will resubscribe to the source
2228+
* Single for a maximum of {@code count} resubscriptions rather than propagating the
2229+
* {@code onError} call.
2230+
*
2231+
* <dl>
2232+
* <dt><b>Scheduler:</b></dt>
2233+
* <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
2234+
* </dl>
2235+
*
2236+
* @param count
2237+
* number of retry attempts before failing
2238+
*
2239+
* @return the source Single modified with retry logic
2240+
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
2241+
*/
2242+
public final Single<T> retry(final long count) {
2243+
return toObservable().retry(count).toSingle();
2244+
}
2245+
2246+
/**
2247+
* Returns an Single that mirrors the source Single, resubscribing to it if it calls {@code onError}
2248+
* and the predicate returns true for that specific exception and retry count.
2249+
*
2250+
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
2251+
* <dl>
2252+
* <dt><b>Backpressure Support:</b></dt>
2253+
* <dd>This operator honors backpressure.</td>
2254+
* <dt><b>Scheduler:</b></dt>
2255+
* <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
2256+
* </dl>
2257+
*
2258+
* @param predicate
2259+
* the predicate that determines if a resubscription may happen in case of a specific exception
2260+
* and retry count
2261+
*
2262+
* @return the source Single modified with retry logic
2263+
* @see #retry()
2264+
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
2265+
*/
2266+
public final Single<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
2267+
return toObservable().retry(predicate).toSingle();
2268+
}
2269+
2270+
/**
2271+
* Returns a Single that emits the same values as the source Single with the exception of an
2272+
* {@code onError}. An {@code onError} notification from the source will result in the emission of a
2273+
* {@link Throwable} item to the Observable provided as an argument to the {@code notificationHandler}
2274+
* function. If that Observable calls {@code onComplete} or {@code onError} then {@code retry} will call
2275+
* {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
2276+
* resubscribe to the source Single.
2277+
*
2278+
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retryWhen.f.png" alt="">
2279+
*
2280+
* <dl>
2281+
* <dt><b>Scheduler:</b></dt>
2282+
* <dd>{@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
2283+
* </dl>
2284+
*
2285+
* @param notificationHandler
2286+
* receives an Observable of notifications with which a user can complete or error, aborting the
2287+
* retry
2288+
*
2289+
* @return the source Single modified with retry logic
2290+
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
2291+
*/
2292+
public final Single<T> retryWhen(final Func1<Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
2293+
return toObservable().retryWhen(notificationHandler).toSingle();
2294+
}
2295+
22122296
}

0 commit comments

Comments
 (0)