Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,110 @@ public static <T> Observable<T> merge(Single<? extends T> t1, Single<? extends T
return Observable.merge(asObservable(t1), asObservable(t2), asObservable(t3), asObservable(t4), asObservable(t5), asObservable(t6), asObservable(t7), asObservable(t8), asObservable(t9));
}

/**
* Flattens any number of Singles into a single Observable, without any transformation.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source
* an Observable of Singles to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> merge(Observable<? extends Single<T>> source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental

return source.lift(OperatorMergeSingle.<T> instance(false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be

return source.map(new Func1<Single<T>, Observable<T>() {
    @Override public Observable<T> call(Single<T> t) {
        return t.toObservable();
    }
}).lift(OperatorMerge.<T>instance(false));

}

/**
* Flattens any number of Singles into a single Observable, without any transformation.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source
* an Observable of Singles to be merged
* @param maxConcurrent
* the maximum number of Singles that may be subscribed to concurrently
* @return an Observable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> merge(Observable<? extends Single<T>> source, int maxConcurrent) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental

return source.lift(OperatorMergeSingle.<T> instance(false, maxConcurrent));
}

/**
* Flattens any number of Singles into a single Observable, in a way that allows an Observer to
* receive all successfully emitted items from all of the source Singles without being interrupted by
* an error notification from one of them.
* <p>
* This behaves like {@link #merge(Observable)} except that if any of the merged Singles notify of an
* error via {@link SingleSubscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged Singles have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source
* an Observable of Singles to be merged
* @return an Observable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> mergeDelayError(Observable<? extends Single<T>> source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental

return source.lift(OperatorMergeSingle.<T> instance(true));
}

/**
* Flattens any number of Singles into a single Observable, in a way that allows an Observer to
* receive all successfully emitted items from all of the source Singles without being interrupted by
* an error notification from one of them.
* <p>
* This behaves like {@link #merge(Observable)} except that if any of the merged Singles notify of an
* error via {@link SingleSubscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged Singles have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Observable, by using
* the {@code merge} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source
* an Observable of Singles to be merged
* @param maxConcurrent
* the maximum number of Singles that may be subscribed to concurrently
* @return an Observable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> mergeDelayError(Observable<? extends Single<T>> source, int maxConcurrent) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental

return source.lift(OperatorMergeSingle.<T> instance(true, maxConcurrent));
}

/**
* Returns a Single that emits the results of a specified combiner function applied to two items emitted by
* two other Singles.
Expand Down
Loading