diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 169073a5b5..d51bac4d81 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -10865,6 +10865,67 @@ public final Observable switchMap(Function(this, mapper, bufferSize, false)); } + /** + * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source + * ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted + * of these SingleSources. + *

+ * The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete. + * If the upstream ObservableSource signals an onError, the inner SingleSource is unsubscribed and the error delivered in-sequence. + *

+ * + *

+ *
Scheduler:
+ *
{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the inner SingleSources and the output + * @param mapper + * a function that, when applied to an item emitted by the source ObservableSource, returns a + * SingleSource + * @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource + * @see ReactiveX operators documentation: FlatMap + * @since 2.0.8 + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Observable switchMapSingle(@NonNull Function> mapper) { + return ObservableInternalHelper.switchMapSingle(this, mapper); + } + + /** + * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source + * ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted + * of these SingleSources and delays any error until all SingleSources terminate. + *

+ * The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete. + * If the upstream ObservableSource signals an onError, the termination of the last inner SingleSource will emit that error as is + * or wrapped into a CompositeException along with the other possible errors the former inner SingleSources signalled. + *

+ * + *

+ *
Scheduler:
+ *
{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the inner SingleSources and the output + * @param mapper + * a function that, when applied to an item emitted by the source ObservableSource, returns a + * SingleSource + * @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource + * @see ReactiveX operators documentation: FlatMap + * @since 2.0.8 + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Observable switchMapSingleDelayError(@NonNull Function> mapper) { + return ObservableInternalHelper.switchMapSingleDelayError(this, mapper); + } + /** * Returns a new ObservableSource by applying a function that you supply to each item emitted by the source * ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index 17a8df604f..a092256501 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -18,7 +18,10 @@ import io.reactivex.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.operators.single.SingleToObservable; import io.reactivex.observables.ConnectableObservable; +import io.reactivex.plugins.RxJavaPlugins; /** * Helper utility class to support Observable with inner classes. @@ -315,4 +318,35 @@ public static Function>, ObservableSou return new ZipIterableFunction(zipper); } + public static Observable switchMapSingle(Observable source, final Function> mapper) { + return source.switchMap(convertSingleMapperToObservableMapper(mapper), 1); + } + + public static Observable switchMapSingleDelayError(Observable source, + Function> mapper) { + return source.switchMapDelayError(convertSingleMapperToObservableMapper(mapper), 1); + } + + private static Function> convertSingleMapperToObservableMapper( + final Function> mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return new ObservableMapper(mapper); + } + + static final class ObservableMapper implements Function> { + + final Function> mapper; + + ObservableMapper(Function> mapper) { + this.mapper = mapper; + } + + @Override + public Observable apply(T t) throws Exception { + return RxJavaPlugins.onAssembly(new SingleToObservable( + ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value"))); + } + + } + } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java index c725767201..94c36df7c9 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java @@ -26,6 +26,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.*; +import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.util.ExceptionHelper; @@ -579,7 +580,6 @@ public ObservableSource apply(Object v) throws Exception { }, 16) .test() .assertResult(1); - } @Test @@ -622,7 +622,83 @@ public void switchMapInnerCancelled() { assertFalse(pp.hasObservers()); } + + @Test + public void switchMapSingleJustSource() { + Observable.just(0) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Object v) throws Exception { + return Single.just(1); + } + }) + .test() + .assertResult(1); + } + + @Test + public void switchMapSingleMapperReturnsNull() { + Observable.just(0) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Object v) throws Exception { + return null; + } + }) + .test() + .assertError(NullPointerException.class); + } + + @Test(expected=NullPointerException.class) + public void switchMapSingleMapperIsNull() { + Observable.just(0) + .switchMapSingle(null); + } + + @Test + public void switchMapSingleFunctionDoesntReturnSingle() { + Observable.just(0) + .switchMapSingle(new Function>() { + @Override + public SingleSource apply(Object v) throws Exception { + return new SingleSource() { + @Override + public void subscribe(SingleObserver s) { + s.onSubscribe(Disposables.empty()); + s.onSuccess(1); + } + }; + } + }) + .test() + .assertResult(1); + } + @Test + public void switchMapSingleDelayErrorJustSource() { + final AtomicBoolean completed = new AtomicBoolean(); + Observable.just(0, 1) + .switchMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + if (v == 0) { + return Single.error(new RuntimeException()); + } else { + return Single.just(1).doOnSuccess(new Consumer() { + + @Override + public void accept(Integer n) throws Exception { + completed.set(true); + }}); + } + } + }) + .test() + .assertValue(1) + .assertError(RuntimeException.class); + assertTrue(completed.get()); + } + @Test public void scalarMap() { Observable.switchOnNext(Observable.just(Observable.just(1)))