diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index f9d7148c8b..5835cbd068 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -25,6 +25,7 @@ import io.reactivex.internal.functions.*; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.operators.flowable.*; +import io.reactivex.internal.operators.mixed.*; import io.reactivex.internal.operators.observable.ObservableFromPublisher; import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.subscribers.*; @@ -6886,6 +6887,168 @@ public final Flowable concatMap(Function(this, mapper, prefetch, ErrorMode.IMMEDIATE)); } + /** + * Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the + * other completes. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function called with the upstream item and should return + * a {@code CompletableSource} to become the next source to + * be subscribed to + * @return a new Completable instance + * @since 2.1.11 - experimental + * @see #concatMapCompletableDelayError(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + @Experimental + public final Completable concatMapCompletable(Function mapper) { + return concatMapCompletable(mapper, 2); + } + + /** + * Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the + * other completes. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function called with the upstream item and should return + * a {@code CompletableSource} to become the next source to + * be subscribed to + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code CompletableSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code CompletableSource}s. + * @return a new Completable instance + * @since 2.1.11 - experimental + * @see #concatMapCompletableDelayError(Function, boolean, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + @Experimental + public final Completable concatMapCompletable(Function mapper, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableConcatMapCompletable(this, mapper, ErrorMode.IMMEDIATE, prefetch)); + } + + /** + * Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the + * other terminates, delaying all errors till both this {@code Flowable} and all + * inner {@code CompletableSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function called with the upstream item and should return + * a {@code CompletableSource} to become the next source to + * be subscribed to + * @return a new Completable instance + * @since 2.1.11 - experimental + * @see #concatMapCompletable(Function, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + @Experimental + public final Completable concatMapCompletableDelayError(Function mapper) { + return concatMapCompletableDelayError(mapper, true, 2); + } + + /** + * Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the + * other terminates, optionally delaying all errors till both this {@code Flowable} and all + * inner {@code CompletableSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function called with the upstream item and should return + * a {@code CompletableSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the + * inner {@code CompletableSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Flowable} is delayed until the current inner + * {@code CompletableSource} terminates and only then is + * it emitted to the downstream. + * @return a new Completable instance + * @since 2.1.11 - experimental + * @see #concatMapCompletable(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + @Experimental + public final Completable concatMapCompletableDelayError(Function mapper, boolean tillTheEnd) { + return concatMapCompletableDelayError(mapper, tillTheEnd, 2); + } + + /** + * Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the + * other terminates, optionally delaying all errors till both this {@code Flowable} and all + * inner {@code CompletableSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param mapper the function called with the upstream item and should return + * a {@code CompletableSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the + * inner {@code CompletableSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Flowable} is delayed until the current inner + * {@code CompletableSource} terminates and only then is + * it emitted to the downstream. + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code CompletableSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code CompletableSource}s. + * @return a new Completable instance + * @since 2.1.11 - experimental + * @see #concatMapCompletable(Function, int) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + @Experimental + public final Completable concatMapCompletableDelayError(Function mapper, boolean tillTheEnd, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableConcatMapCompletable(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, prefetch)); + } + /** * Maps each of the items into a Publisher, subscribes to them one after the other, * one at a time and emits their values in order diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapCompletable.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapCompletable.java new file mode 100644 index 0000000000..1482c137b4 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapCompletable.java @@ -0,0 +1,291 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.Subscription; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps the upstream intems into {@link CompletableSource}s and subscribes to them one after the + * other completes or terminates (in error-delaying mode). + * @param the upstream value type + * @since 2.1.11 - experimental + */ +@Experimental +public final class FlowableConcatMapCompletable extends Completable { + + final Flowable source; + + final Function mapper; + + final ErrorMode errorMode; + + final int prefetch; + + public FlowableConcatMapCompletable(Flowable source, + Function mapper, + ErrorMode errorMode, + int prefetch) { + this.source = source; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(CompletableObserver s) { + source.subscribe(new ConcatMapCompletableObserver(s, mapper, errorMode, prefetch)); + } + + static final class ConcatMapCompletableObserver + extends AtomicInteger + implements FlowableSubscriber, Disposable { + + private static final long serialVersionUID = 3610901111000061034L; + + final CompletableObserver downstream; + + final Function mapper; + + final ErrorMode errorMode; + + final AtomicThrowable errors; + + final ConcatMapInnerObserver inner; + + final int prefetch; + + final SimplePlainQueue queue; + + Subscription upstream; + + volatile boolean active; + + volatile boolean done; + + volatile boolean disposed; + + int consumed; + + ConcatMapCompletableObserver(CompletableObserver downstream, + Function mapper, + ErrorMode errorMode, int prefetch) { + this.downstream = downstream; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + this.errors = new AtomicThrowable(); + this.inner = new ConcatMapInnerObserver(this); + this.queue = new SpscArrayQueue(prefetch); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(upstream, s)) { + this.upstream = s; + downstream.onSubscribe(this); + s.request(prefetch); + } + } + + @Override + public void onNext(T t) { + if (queue.offer(t)) { + drain(); + } else { + upstream.cancel(); + onError(new MissingBackpressureException("Queue full?!")); + } + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (errorMode == ErrorMode.IMMEDIATE) { + inner.dispose(); + t = errors.terminate(); + if (t != ExceptionHelper.TERMINATED) { + downstream.onError(t); + } + if (getAndIncrement() == 0) { + queue.clear(); + } + } else { + done = true; + drain(); + } + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void dispose() { + disposed = true; + upstream.cancel(); + inner.dispose(); + if (getAndIncrement() == 0) { + queue.clear(); + } + } + + @Override + public boolean isDisposed() { + return disposed; + } + + void innerError(Throwable ex) { + if (errors.addThrowable(ex)) { + if (errorMode == ErrorMode.IMMEDIATE) { + upstream.cancel(); + ex = errors.terminate(); + if (ex != ExceptionHelper.TERMINATED) { + downstream.onError(ex); + } + if (getAndIncrement() == 0) { + queue.clear(); + } + } else { + active = false; + drain(); + } + } else { + RxJavaPlugins.onError(ex); + } + } + + void innerComplete() { + active = false; + drain(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + do { + if (disposed) { + queue.clear(); + return; + } + + if (!active) { + + if (errorMode == ErrorMode.BOUNDARY) { + if (errors.get() != null) { + queue.clear(); + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + boolean d = done; + T v = queue.poll(); + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } + + if (!empty) { + + int limit = prefetch - (prefetch >> 1); + int c = consumed + 1; + if (c == limit) { + consumed = 0; + upstream.request(limit); + } else { + consumed = c; + } + + CompletableSource cs; + + try { + cs = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null CompletableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + queue.clear(); + upstream.cancel(); + errors.addThrowable(ex); + ex = errors.terminate(); + downstream.onError(ex); + return; + } + active = true; + cs.subscribe(inner); + } + } + } while (decrementAndGet() != 0); + } + + static final class ConcatMapInnerObserver extends AtomicReference + implements CompletableObserver { + + private static final long serialVersionUID = 5638352172918776687L; + + final ConcatMapCompletableObserver parent; + + ConcatMapInnerObserver(ConcatMapCompletableObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); + } + + @Override + public void onError(Throwable e) { + parent.innerError(e); + } + + @Override + public void onComplete() { + parent.innerComplete(); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapCompletableTest.java new file mode 100644 index 0000000000..ef0e291581 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapCompletableTest.java @@ -0,0 +1,391 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.CompletableSubject; + +public class FlowableConcatMapCompletableTest { + + @Test + public void simple() { + Flowable.range(1, 5) + .concatMapCompletable(Functions.justFunction(Completable.complete())) + .test() + .assertResult(); + } + + @Test + public void simple2() { + final AtomicInteger counter = new AtomicInteger(); + Flowable.range(1, 5) + .concatMapCompletable(Functions.justFunction(Completable.fromAction(new Action() { + @Override + public void run() throws Exception { + counter.incrementAndGet(); + } + }))) + .test() + .assertResult(); + + assertEquals(5, counter.get()); + } + + @Test + public void simpleLongPrefetch() { + Flowable.range(1, 1024) + .concatMapCompletable(Functions.justFunction(Completable.complete()), 32) + .test() + .assertResult(); + } + + @Test + public void mainError() { + Flowable.error(new TestException()) + .concatMapCompletable(Functions.justFunction(Completable.complete())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Flowable.just(1) + .concatMapCompletable(Functions.justFunction(Completable.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerErrorDelayed() { + Flowable.range(1, 5) + .concatMapCompletableDelayError( + new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + return Completable.error(new TestException()); + } + } + ) + .test() + .assertFailure(CompositeException.class) + .assertOf(new Consumer>() { + @Override + public void accept(TestObserver to) throws Exception { + assertEquals(5, ((CompositeException)to.errors().get(0)).getExceptions().size()); + } + }); + } + + @Test + public void mapperCrash() { + Flowable.just(1) + .concatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void immediateError() { + PublishProcessor pp = PublishProcessor.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.concatMapCompletable( + Functions.justFunction(cs)).test(); + + to.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + assertFalse(cs.hasObservers()); + + pp.onNext(1); + + assertTrue(cs.hasObservers()); + + pp.onError(new TestException()); + + assertFalse(cs.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void immediateError2() { + PublishProcessor pp = PublishProcessor.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.concatMapCompletable( + Functions.justFunction(cs)).test(); + + to.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + assertFalse(cs.hasObservers()); + + pp.onNext(1); + + assertTrue(cs.hasObservers()); + + cs.onError(new TestException()); + + assertFalse(pp.hasSubscribers()); + + to.assertFailure(TestException.class); + } + + @Test + public void boundaryError() { + PublishProcessor pp = PublishProcessor.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.concatMapCompletableDelayError( + Functions.justFunction(cs), false).test(); + + to.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + assertFalse(cs.hasObservers()); + + pp.onNext(1); + + assertTrue(cs.hasObservers()); + + pp.onError(new TestException()); + + assertTrue(cs.hasObservers()); + + to.assertEmpty(); + + cs.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void endError() { + PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + final CompletableSubject cs2 = CompletableSubject.create(); + + TestObserver to = pp.concatMapCompletableDelayError( + new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + if (v == 1) { + return cs; + } + return cs2; + } + }, true, 32 + ) + .test(); + + to.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + assertFalse(cs.hasObservers()); + + pp.onNext(1); + + assertTrue(cs.hasObservers()); + + cs.onError(new TestException()); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(2); + + to.assertEmpty(); + + cs2.onComplete(); + + assertTrue(pp.hasSubscribers()); + + to.assertEmpty(); + + pp.onComplete(); + + to.assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowableToCompletable( + new Function, Completable>() { + @Override + public Completable apply(Flowable f) + throws Exception { + return f.concatMapCompletable( + Functions.justFunction(Completable.complete())); + } + } + ); + } + + @Test + public void disposed() { + TestHelper.checkDisposed( + Flowable.never() + .concatMapCompletable( + Functions.justFunction(Completable.complete())) + ); + } + + @Test + public void queueOverflow() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onError(new TestException()); + } + } + .concatMapCompletable( + Functions.justFunction(Completable.never()), 1 + ) + .test() + .assertFailure(MissingBackpressureException.class); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void immediateOuterInnerErrorRace() { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = pp.concatMapCompletable( + Functions.justFunction(cs) + ) + .test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onError(ex); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to.assertError(new Predicate() { + @Override + public boolean test(Throwable e) throws Exception { + return e instanceof TestException || e instanceof CompositeException; + } + }) + .assertNotComplete(); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void disposeInDrainLoop() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + final TestObserver to = pp.concatMapCompletable( + Functions.justFunction(cs) + ) + .test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(2); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + cs.onComplete(); + to.cancel(); + } + }; + + TestHelper.race(r1, r2); + + to.assertEmpty(); + } + } + + @Test + public void doneButNotEmpty() { + final PublishProcessor pp = PublishProcessor.create(); + final CompletableSubject cs = CompletableSubject.create(); + + final TestObserver to = pp.concatMapCompletable( + Functions.justFunction(cs) + ) + .test(); + + pp.onNext(1); + pp.onNext(2); + pp.onComplete(); + + cs.onComplete(); + + to.assertResult(); + } +}