diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 5835cbd068..f177750a7b 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -14114,6 +14114,98 @@ public final Flowable switchMap(Function + * + *

+ * Since a {@code CompletableSource} doesn't produce any items, the resulting reactive type of + * this operator is a {@link Completable} that can only indicate successful completion or + * a failure in any of the inner {@code CompletableSource}s or the failure of the current + * {@link Flowable}. + *

+ *
Backpressure:
+ *
The operator consumes the current {@link Flowable} in an unbounded manner and otherwise + * does not have backpressure in its return type because no items are ever produced.
+ *
Scheduler:
+ *
{@code switchMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either this {@code Flowable} or the active {@code CompletableSource} signals an {@code onError}, + * the resulting {@code Completable} is terminated immediately with that {@code Throwable}. + * Use the {@link #switchMapCompletableDelayError(Function)} to delay such inner failures until + * every inner {@code CompletableSource}s and the main {@code Flowable} terminates in some fashion. + * If they fail concurrently, the operator may combine the {@code Throwable}s into a + * {@link io.reactivex.exceptions.CompositeException CompositeException} + * and signal it to the downstream instead. If any inactivated (switched out) {@code CompletableSource} + * signals an {@code onError} late, the {@code Throwable}s will be signalled to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. + *
+ *
+ * @param mapper the function called with each upstream item and should return a + * {@link CompletableSource} to be subscribed to and awaited for + * (non blockingly) for its terminal event + * @return the new Completable instance + * @since 2.1.11 - experimental + * @see #switchMapCompletableDelayError(Function) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable switchMapCompletable(@NonNull Function mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletable(this, mapper, false)); + } + + /** + * Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while + * disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one + * active {@code CompletableSource} running and delaying any main or inner errors until all + * of them terminate. + *

+ * + *

+ * Since a {@code CompletableSource} doesn't produce any items, the resulting reactive type of + * this operator is a {@link Completable} that can only indicate successful completion or + * a failure in any of the inner {@code CompletableSource}s or the failure of the current + * {@link Flowable}. + *

+ *
Backpressure:
+ *
The operator consumes the current {@link Flowable} in an unbounded manner and otherwise + * does not have backpressure in its return type because no items are ever produced.
+ *
Scheduler:
+ *
{@code switchMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
Errors of this {@code Flowable} and all the {@code CompletableSource}s, who had the chance + * to run to their completion, are delayed until + * all of them terminate in some fashion. At this point, if there was only one failure, the respective + * {@code Throwable} is emitted to the dowstream. It there were more than one failures, the + * operator combines all {@code Throwable}s into a {@link io.reactivex.exceptions.CompositeException CompositeException} + * and signals that to the downstream. + * If any inactivated (switched out) {@code CompletableSource} + * signals an {@code onError} late, the {@code Throwable}s will be signalled to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. + *
+ *
+ * @param mapper the function called with each upstream item and should return a + * {@link CompletableSource} to be subscribed to and awaited for + * (non blockingly) for its terminal event + * @return the new Completable instance + * @since 2.1.11 - experimental + * @see #switchMapCompletableDelayError(Function) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable switchMapCompletableDelayError(@NonNull Function mapper) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletable(this, mapper, true)); + } + /** * Returns a new Publisher by applying a function that you supply to each item emitted by the source * Publisher that returns a Publisher, and then emitting the items emitted by the most recently emitted diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletable.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletable.java new file mode 100644 index 0000000000..2d0f753fc8 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletable.java @@ -0,0 +1,239 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.Subscription; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while + * disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one + * active {@code CompletableSource} running. + * + * @param the upstream value type + * @since 2.1.11 - experimental + */ +@Experimental +public final class FlowableSwitchMapCompletable extends Completable { + + final Flowable source; + + final Function mapper; + + final boolean delayErrors; + + public FlowableSwitchMapCompletable(Flowable source, + Function mapper, boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(CompletableObserver s) { + source.subscribe(new SwitchMapCompletableObserver(s, mapper, delayErrors)); + } + + static final class SwitchMapCompletableObserver implements FlowableSubscriber, Disposable { + + final CompletableObserver downstream; + + final Function mapper; + + final boolean delayErrors; + + final AtomicThrowable errors; + + final AtomicReference inner; + + static final SwitchMapInnerObserver INNER_DISPOSED = new SwitchMapInnerObserver(null); + + volatile boolean done; + + Subscription upstream; + + SwitchMapCompletableObserver(CompletableObserver downstream, + Function mapper, boolean delayErrors) { + this.downstream = downstream; + this.mapper = mapper; + this.delayErrors = delayErrors; + this.errors = new AtomicThrowable(); + this.inner = new AtomicReference(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(upstream, s)) { + this.upstream = s; + downstream.onSubscribe(this); + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + CompletableSource c; + + try { + c = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + onError(ex); + return; + } + + SwitchMapInnerObserver o = new SwitchMapInnerObserver(this); + + for (;;) { + SwitchMapInnerObserver current = inner.get(); + if (current == INNER_DISPOSED) { + break; + } + if (inner.compareAndSet(current, o)) { + if (current != null) { + current.dispose(); + } + c.subscribe(o); + break; + } + } + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (delayErrors) { + onComplete(); + } else { + disposeInner(); + Throwable ex = errors.terminate(); + if (ex != ExceptionHelper.TERMINATED) { + downstream.onError(ex); + } + } + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + if (inner.get() == null) { + Throwable ex = errors.terminate(); + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + } + } + + void disposeInner() { + SwitchMapInnerObserver o = inner.getAndSet(INNER_DISPOSED); + if (o != null && o != INNER_DISPOSED) { + o.dispose(); + } + } + + @Override + public void dispose() { + upstream.cancel(); + disposeInner(); + } + + @Override + public boolean isDisposed() { + return inner.get() == INNER_DISPOSED; + } + + void innerError(SwitchMapInnerObserver sender, Throwable error) { + if (inner.compareAndSet(sender, null)) { + if (errors.addThrowable(error)) { + if (delayErrors) { + if (done) { + Throwable ex = errors.terminate(); + downstream.onError(ex); + } + } else { + dispose(); + Throwable ex = errors.terminate(); + if (ex != ExceptionHelper.TERMINATED) { + downstream.onError(ex); + } + } + return; + } + } + RxJavaPlugins.onError(error); + } + + void innerComplete(SwitchMapInnerObserver sender) { + if (inner.compareAndSet(sender, null)) { + if (done) { + Throwable ex = errors.terminate(); + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + } + } + } + + static final class SwitchMapInnerObserver extends AtomicReference + implements CompletableObserver { + + private static final long serialVersionUID = -8003404460084760287L; + + final SwitchMapCompletableObserver parent; + + SwitchMapInnerObserver(SwitchMapCompletableObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onError(Throwable e) { + parent.innerError(this, e); + } + + @Override + public void onComplete() { + parent.innerComplete(this); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletableTest.java new file mode 100644 index 0000000000..5bd6196fad --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletableTest.java @@ -0,0 +1,389 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.CompletableSubject; + +public class FlowableSwitchMapCompletableTest { + + @Test + public void normal() { + Flowable.range(1, 10) + .switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }) + .test() + .assertResult(); + } + + @Test + public void mainError() { + Flowable.error(new TestException()) + .switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.complete(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + PublishProcessor pp = PublishProcessor.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.switchMapCompletable(Functions.justFunction(cs)) + .test(); + + assertTrue(pp.hasSubscribers()); + assertFalse(cs.hasObservers()); + + pp.onNext(1); + + assertTrue(cs.hasObservers()); + + to.assertEmpty(); + + cs.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse(pp.hasSubscribers()); + assertFalse(cs.hasObservers()); + } + + @Test + public void switchOver() { + final CompletableSubject[] css = { + CompletableSubject.create(), + CompletableSubject.create() + }; + + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = pp.switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return css[v]; + } + }) + .test(); + + to.assertEmpty(); + + pp.onNext(0); + + assertTrue(css[0].hasObservers()); + + pp.onNext(1); + + assertFalse(css[0].hasObservers()); + assertTrue(css[1].hasObservers()); + + pp.onComplete(); + + to.assertEmpty(); + + assertTrue(css[1].hasObservers()); + + css[1].onComplete(); + + to.assertResult(); + } + + @Test + public void dispose() { + PublishProcessor pp = PublishProcessor.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.switchMapCompletable(Functions.justFunction(cs)) + .test(); + + pp.onNext(1); + + assertTrue(pp.hasSubscribers()); + assertTrue(cs.hasObservers()); + + to.dispose(); + + assertFalse(pp.hasSubscribers()); + assertFalse(cs.hasObservers()); + } + + @Test + public void checkDisposed() { + PublishProcessor pp = PublishProcessor.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestHelper.checkDisposed(pp.switchMapCompletable(Functions.justFunction(cs))); + } + + @Test + public void checkBadSource() { + TestHelper.checkDoubleOnSubscribeFlowableToCompletable(new Function, Completable>() { + @Override + public Completable apply(Flowable f) throws Exception { + return f.switchMapCompletable(Functions.justFunction(Completable.never())); + } + }); + } + + @Test + public void mapperCrash() { + Flowable.range(1, 5).switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer f) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperCancels() { + final TestObserver to = new TestObserver(); + + Flowable.range(1, 5).switchMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer f) throws Exception { + to.cancel(); + return Completable.complete(); + } + }) + .subscribe(to); + + to.assertEmpty(); + } + + @Test + public void onNextInnerCompleteRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.switchMapCompletable(Functions.justFunction(cs)).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + to.assertEmpty(); + } + } + + @Test + public void onNextInnerErrorRace() { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.switchMapCompletable(Functions.justFunction(cs)).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void onErrorInnerErrorRace() { + final TestException ex0 = new TestException(); + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.switchMapCompletable(Functions.justFunction(cs)).test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onError(ex0); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void innerErrorThenMainError() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException("main")); + } + } + .switchMapCompletable(Functions.justFunction(Completable.error(new TestException("inner")))) + .test() + .assertFailureAndMessage(TestException.class, "inner"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "main"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void innerErrorDelayed() { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.switchMapCompletableDelayError(Functions.justFunction(cs)).test(); + + pp.onNext(1); + + cs.onError(new TestException()); + + to.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + + pp.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void mainCompletesinnerErrorDelayed() { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.switchMapCompletableDelayError(Functions.justFunction(cs)).test(); + + pp.onNext(1); + pp.onComplete(); + + to.assertEmpty(); + + cs.onError(new TestException()); + + to.assertFailure(TestException.class); + } + + @Test + public void mainErrorDelayed() { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.switchMapCompletableDelayError(Functions.justFunction(cs)).test(); + + pp.onNext(1); + + pp.onError(new TestException()); + + to.assertEmpty(); + + assertTrue(cs.hasObservers()); + + cs.onComplete(); + + to.assertFailure(TestException.class); + } +}