Skip to content

Added retry and retryWhen support for Single #3686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 106 additions & 22 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,26 @@
*/
package rx;

import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import rx.Observable.Operator;
import rx.annotations.Beta;
import rx.annotations.Experimental;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.Func3;
import rx.functions.Func4;
import rx.functions.Func5;
import rx.functions.Func6;
import rx.functions.Func7;
import rx.functions.Func8;
import rx.functions.Func9;
import rx.functions.FuncN;
import rx.annotations.Beta;
import rx.functions.*;
import rx.internal.operators.*;
import rx.internal.producers.SingleDelayedProducer;
import rx.internal.util.ScalarSynchronousSingle;
import rx.internal.util.UtilityFunctions;
import rx.singles.BlockingSingle;
import rx.observers.SafeSubscriber;
import rx.plugins.*;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
import rx.singles.BlockingSingle;
import rx.subscriptions.Subscriptions;

import java.util.Collection;
import java.util.concurrent.*;

/**
* The Single class implements the Reactive Pattern for a single value response. See {@link Observable} for the
* implementation of the Reactive Pattern for a stream or vector of values.
Expand Down Expand Up @@ -1820,7 +1807,7 @@ public void onError(Throwable error) {
* @return an {@link Observable} that emits a single item T.
*/
public final Observable<T> toObservable() {
return asObservable(this);
return asObservable(this);
}

/**
Expand Down Expand Up @@ -2209,4 +2196,101 @@ static <T> Single<? extends T>[] iterableToArray(final Iterable<? extends Single

return singlesArray;
}

/**
* Returns a Single that mirrors the source Single, resubscribing to it if it calls {@code onError}
* (infinite retry count).
*
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
*
* If the source Single calls {@link SingleSubscriber#onError}, this method will resubscribe to the source
* Single rather than propagating the {@code onError} call.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @return the source Single modified with retry logic
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
*/
public final Single<T> retry() {
return toObservable().retry().toSingle();
}

/**
* Returns an Single that mirrors the source Single, resubscribing to it if it calls {@code onError}
* up to a specified number of retries.
*
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
*
* If the source Single calls {@link SingleSubscriber#onError}, this method will resubscribe to the source
* Single for a maximum of {@code count} resubscriptions rather than propagating the
* {@code onError} call.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @param count
* number of retry attempts before failing
*
* @return the source Single modified with retry logic
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
*/
public final Single<T> retry(final long count) {
return toObservable().retry(count).toSingle();
}

/**
* Returns an Single that mirrors the source Single, resubscribing to it if it calls {@code onError}
* and the predicate returns true for that specific exception and retry count.
*
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator honors backpressure.</td>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @param predicate
* the predicate that determines if a resubscription may happen in case of a specific exception
* and retry count
*
* @return the source Single modified with retry logic
* @see #retry()
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
*/
public final Single<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
return toObservable().retry(predicate).toSingle();
}

/**
* Returns a Single that emits the same values as the source Single with the exception of an
* {@code onError}. An {@code onError} notification from the source will result in the emission of a
* {@link Throwable} item to the Observable provided as an argument to the {@code notificationHandler}
* function. If that Observable calls {@code onComplete} or {@code onError} then {@code retry} will call
* {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
* resubscribe to the source Single.
*
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retryWhen.f.png" alt="">
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @param notificationHandler
* receives an Observable of notifications with which a user can complete or error, aborting the
* retry
*
* @return the source Single modified with retry logic
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
*/
public final Single<T> retryWhen(final Func1<Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
return toObservable().retryWhen(notificationHandler).toSingle();
}

}
Loading