Skip to content

1.x: add Single.merge(Obs), Obs.flatMapSingle & flatMapCompletable #5092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6973,6 +6973,74 @@ public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Obser
return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)), maxConcurrent);
}

/**
* Maps all upstream values to Completables and runs them together until the upstream
* and all inner Completables complete normally.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes items from upstream in an unbounded manner and ignores downstream backpressure
* as it doesn't emit items but only terminal event.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that receives an upstream value and turns it into a Completable
* to be merged.
* @return the new Observable instance
* @see #flatMapCompletable(Func1, boolean, int)
* @since 1.2.7 - experimental
*/
@Experimental
public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper) {
return flatMapCompletable(mapper, false, Integer.MAX_VALUE);
}

/**
* Maps all upstream values to Completables and runs them together, optionally delaying any errors, until the upstream
* and all inner Completables terminate.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes items from upstream in an unbounded manner and ignores downstream backpressure
* as it doesn't emit items but only terminal event.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that receives an upstream value and turns it into a Completable
* to be merged.
* @param delayErrors if true, errors from the upstream and from the inner Completables get delayed till
* the all of them terminate.
* @return the new Observable instance
* @since 1.2.7 - experimental
* @see #flatMapCompletable(Func1, boolean, int)
*/
@Experimental
public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper, boolean delayErrors) {
return flatMapCompletable(mapper, delayErrors, Integer.MAX_VALUE);
}

/**
* Maps upstream values to Completables and runs up to the given number of them together at a time,
* optionally delaying any errors, until the upstream and all inner Completables terminate.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes at most maxConcurrent items from upstream and one-by-one after as the inner
* Completables terminate. The operator ignores downstream backpressure as it doesn't emit items but
* only the terminal event.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param mapper the function that receives an upstream value and turns it into a Completable
* to be merged.
* @param delayErrors if true, errors from the upstream and from the inner Completables get delayed till
* the all of them terminate.
* @param maxConcurrency the maximum number of inner Completables to run at a time
* @return the new Observable instance
* @since 1.2.7 - experimental
*/
@Experimental
public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper, boolean delayErrors, int maxConcurrency) {
return unsafeCreate(new OnSubscribeFlatMapCompletable<T>(this, mapper, delayErrors, maxConcurrency));
}

/**
* Returns an Observable that merges each item emitted by the source Observable with the values in an
* Iterable corresponding to that item that is generated by a selector.
Expand Down Expand Up @@ -7106,6 +7174,74 @@ public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Ite
return (Observable<R>)flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector, maxConcurrent);
}

/**
* Maps all upstream values to Singles and runs them together until the upstream
* and all inner Singles complete normally.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes items from upstream in an unbounded manner and honors downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the value type of the inner Singles and the resulting Observable
* @param mapper the function that receives an upstream value and turns it into a Single
* to be merged.
* @return the new Observable instance
* @see #flatMapSingle(Func1, boolean, int)
* @since 1.2.7 - experimental
*/
@Experimental
public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper) {
return flatMapSingle(mapper, false, Integer.MAX_VALUE);
}

/**
* Maps all upstream values to Singles and runs them together, optionally delaying any errors, until the upstream
* and all inner Singles terminate.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes items from upstream in an unbounded manner and honors downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the value type of the inner Singles and the resulting Observable
* @param mapper the function that receives an upstream value and turns it into a Single
* to be merged.
* @param delayErrors if true, errors from the upstream and from the inner Singles get delayed till
* the all of them terminate.
* @return the new Observable instance
* @since 1.2.7 - experimental
* @see #flatMapSingle(Func1, boolean, int)
*/
@Experimental
public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper, boolean delayErrors) {
return flatMapSingle(mapper, delayErrors, Integer.MAX_VALUE);
}

/**
* Maps upstream values to Singles and runs up to the given number of them together at a time,
* optionally delaying any errors, until the upstream and all inner Singles terminate.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes at most maxConcurrent items from upstream and one-by-one after as the inner
* Singles terminate. The operator honors downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the value type of the inner Singles and the resulting Observable
* @param mapper the function that receives an upstream value and turns it into a Single
* to be merged.
* @param delayErrors if true, errors from the upstream and from the inner Singles get delayed till
* the all of them terminate.
* @param maxConcurrency the maximum number of inner Singles to run at a time
* @return the new Observable instance
* @since 1.2.7 - experimental
*/
@Experimental
public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return unsafeCreate(new OnSubscribeFlatMapSingle<T, R>(this, mapper, delayErrors, maxConcurrency));
}

/**
* Subscribes to the {@link Observable} and receives notifications for each element.
* <p>
Expand Down
91 changes: 91 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,97 @@ public static <T> Observable<T> merge(Single<? extends T> t1, Single<? extends T
return Observable.merge(asObservable(t1), asObservable(t2), asObservable(t3), asObservable(t4), asObservable(t5), asObservable(t6), asObservable(t7), asObservable(t8), asObservable(t9));
}

/**
* Merges all Singles emitted by the Observable and runs them together until the source
* Observable and all inner Singles complete normally.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes items from the Observable in an unbounded manner and honors downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the inner Singles and the resulting Observable
* @param sources the Observable that emits Singles to be merged
* @return the new Observable instance
* @see #merge(Observable, int)
* @see #mergeDelayError(Observable)
* @see #mergeDelayError(Observable, int)
* @since 1.2.7 - experimental
*/
@Experimental
public static <T> Observable<T> merge(Observable<? extends Single<? extends T>> sources) {
return merge(sources, Integer.MAX_VALUE);
}

/**
* Merges the Singles emitted by the Observable and runs up to the given number of them together at a time,
* until the Observable and all inner Singles terminate.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes at most maxConcurrent items from the Observable and one-by-one after as the inner
* Singles terminate. The operator ignores downstream backpressure as it doesn't emit items but
* only the terminal event.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the inner Singles and the resulting Observable
* @param sources the Observable that emits Singles to be merged
* @param maxConcurrency the maximum number of inner Singles to run at a time
* @return the new Observable instance
* @since 1.2.7 - experimental
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Experimental
public static <T> Observable<T> merge(Observable<? extends Single<? extends T>> sources, int maxConcurrency) {
return sources.flatMapSingle((Func1)UtilityFunctions.identity(), false, maxConcurrency);
}

/**
* Merges all Singles emitted by the Observable and runs them together,
* delaying errors from them and the Observable, until the source
* Observable and all inner Singles complete normally.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes items from the Observable in an unbounded manner and honors downstream backpressure.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the inner Singles and the resulting Observable
* @param sources the Observable that emits Singles to be merged
* @return the new Observable instance
* @see #mergeDelayError(Observable, int)
* @see #merge(Observable)
* @see #merge(Observable, int)
* @since 1.2.7 - experimental
*/
@Experimental
public static <T> Observable<T> mergeDelayError(Observable<? extends Single<? extends T>> sources) {
return merge(sources, Integer.MAX_VALUE);
}

/**
* Merges the Singles emitted by the Observable and runs up to the given number of them together at a time,
* delaying errors from them and the Observable, until the Observable and all inner Singles terminate.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes at most maxConcurrent items from the Observable and one-by-one after as the inner
* Singles terminate. The operator ignores downstream backpressure as it doesn't emit items but
* only the terminal event.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the inner Singles and the resulting Observable
* @param sources the Observable that emits Singles to be merged
* @param maxConcurrency the maximum number of inner Singles to run at a time
* @return the new Observable instance
* @since 1.2.7 - experimental
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Experimental
public static <T> Observable<T> mergeDelayError(Observable<? extends Single<? extends T>> sources, int maxConcurrency) {
return sources.flatMapSingle((Func1)UtilityFunctions.identity(), true, maxConcurrency);
}

/**
* Returns a Single that emits the results of a specified combiner function applied to two items emitted by
* two other Singles.
Expand Down
Loading