@@ -1693,7 +1693,11 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
1693
1693
* {@code source} Observable
1694
1694
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1695
1695
*/
1696
+ @ SuppressWarnings ({"unchecked" , "rawtypes" })
1696
1697
public final static <T > Observable <T > merge (Observable <? extends Observable <? extends T >> source ) {
1698
+ if (source .getClass () == ScalarSynchronousObservable .class ) {
1699
+ return ((ScalarSynchronousObservable <T >)source ).scalarFlatMap ((Func1 )UtilityFunctions .identity ());
1700
+ }
1697
1701
return source .lift (OperatorMerge .<T >instance (false ));
1698
1702
}
1699
1703
@@ -1721,8 +1725,13 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
1721
1725
* if {@code maxConcurrent} is less than or equal to 0
1722
1726
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1723
1727
*/
1728
+ @ Experimental
1729
+ @ SuppressWarnings ({"unchecked" , "rawtypes" })
1724
1730
public final static <T > Observable <T > merge (Observable <? extends Observable <? extends T >> source , int maxConcurrent ) {
1725
- return source .lift (new OperatorMergeMaxConcurrent <T >(maxConcurrent ));
1731
+ if (source .getClass () == ScalarSynchronousObservable .class ) {
1732
+ return ((ScalarSynchronousObservable <T >)source ).scalarFlatMap ((Func1 )UtilityFunctions .identity ());
1733
+ }
1734
+ return source .lift (OperatorMerge .<T >instance (false , maxConcurrent ));
1726
1735
}
1727
1736
1728
1737
/**
@@ -1993,7 +2002,31 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
1993
2002
public final static <T > Observable <T > merge (Observable <? extends T >[] sequences ) {
1994
2003
return merge (from (sequences ));
1995
2004
}
1996
-
2005
+
2006
+ /**
2007
+ * Flattens an Array of Observables into one Observable, without any transformation, while limiting the
2008
+ * number of concurrent subscriptions to these Observables.
2009
+ * <p>
2010
+ * <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.io.png" alt="">
2011
+ * <p>
2012
+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by
2013
+ * using the {@code merge} method.
2014
+ * <dl>
2015
+ * <dt><b>Scheduler:</b></dt>
2016
+ * <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
2017
+ * </dl>
2018
+ *
2019
+ * @param sequences
2020
+ * the Array of Observables
2021
+ * @param maxConcurrent
2022
+ * the maximum number of Observables that may be subscribed to concurrently
2023
+ * @return an Observable that emits all of the items emitted by the Observables in the Array
2024
+ * @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2025
+ */
2026
+ @ Experimental
2027
+ public final static <T > Observable <T > merge (Observable <? extends T >[] sequences , int maxConcurrent ) {
2028
+ return merge (from (sequences ), maxConcurrent );
2029
+ }
1997
2030
/**
1998
2031
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
1999
2032
* receive all successfully emitted items from all of the source Observables without being interrupted by
@@ -2021,6 +2054,37 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
2021
2054
public final static <T > Observable <T > mergeDelayError (Observable <? extends Observable <? extends T >> source ) {
2022
2055
return source .lift (OperatorMerge .<T >instance (true ));
2023
2056
}
2057
+ /**
2058
+ * Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
2059
+ * receive all successfully emitted items from all of the source Observables without being interrupted by
2060
+ * an error notification from one of them, while limiting the
2061
+ * number of concurrent subscriptions to these Observables.
2062
+ * <p>
2063
+ * This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
2064
+ * error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
2065
+ * error notification until all of the merged Observables have finished emitting items.
2066
+ * <p>
2067
+ * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
2068
+ * <p>
2069
+ * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
2070
+ * invoke the {@code onError} method of its Observers once.
2071
+ * <dl>
2072
+ * <dt><b>Scheduler:</b></dt>
2073
+ * <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
2074
+ * </dl>
2075
+ *
2076
+ * @param source
2077
+ * an Observable that emits Observables
2078
+ * @param maxConcurrent
2079
+ * the maximum number of Observables that may be subscribed to concurrently
2080
+ * @return an Observable that emits all of the items emitted by the Observables emitted by the
2081
+ * {@code source} Observable
2082
+ * @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2083
+ */
2084
+ @ Experimental
2085
+ public final static <T > Observable <T > mergeDelayError (Observable <? extends Observable <? extends T >> source , int maxConcurrent ) {
2086
+ return source .lift (OperatorMerge .<T >instance (true , maxConcurrent ));
2087
+ }
2024
2088
2025
2089
/**
2026
2090
* Flattens two Observables into one Observable, in a way that allows an Observer to receive all
@@ -4618,6 +4682,9 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
4618
4682
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
4619
4683
*/
4620
4684
public final <R > Observable <R > flatMap (Func1 <? super T , ? extends Observable <? extends R >> func ) {
4685
+ if (getClass () == ScalarSynchronousObservable .class ) {
4686
+ return ((ScalarSynchronousObservable <T >)this ).scalarFlatMap (func );
4687
+ }
4621
4688
return merge (map (func ));
4622
4689
}
4623
4690
@@ -4646,6 +4713,9 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
4646
4713
*/
4647
4714
@ Beta
4648
4715
public final <R > Observable <R > flatMap (Func1 <? super T , ? extends Observable <? extends R >> func , int maxConcurrent ) {
4716
+ if (getClass () == ScalarSynchronousObservable .class ) {
4717
+ return ((ScalarSynchronousObservable <T >)this ).scalarFlatMap (func );
4718
+ }
4649
4719
return merge (map (func ), maxConcurrent );
4650
4720
}
4651
4721
0 commit comments