diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 5835cbd068..7bcfca4e94 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -6888,7 +6888,7 @@ public final Flowable concatMap(Function * @@ -6915,7 +6915,7 @@ public final Completable concatMapCompletable(Function * @@ -6948,7 +6948,7 @@ public final Completable concatMapCompletable(Function @@ -6976,7 +6976,7 @@ public final Completable concatMapCompletableDelayError(Function @@ -7010,7 +7010,7 @@ public final Completable concatMapCompletableDelayError(Function @@ -7313,6 +7313,362 @@ public final Flowable concatMapIterable(final Function(this, mapper, prefetch)); } + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other succeeds or completes, emits their success value if available or terminates immediately if + * either this {@code Flowable} or the current inner {@code MaybeSource} fail. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybeDelayError(Function) + * @see #concatMapMaybe(Function, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapMaybe(Function> mapper) { + return concatMapMaybe(mapper, 2); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other succeeds or completes, emits their success value if available or terminates immediately if + * either this {@code Flowable} or the current inner {@code MaybeSource} fail. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code MaybeSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code MaybeSource}s. + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybe(Function) + * @see #concatMapMaybeDelayError(Function, boolean, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapMaybe(Function> mapper, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableConcatMapMaybe(this, mapper, ErrorMode.IMMEDIATE, prefetch)); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other terminates, emits their success value if available and delaying all errors + * till both this {@code Flowable} and all inner {@code MaybeSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybe(Function) + * @see #concatMapMaybeDelayError(Function, boolean) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapMaybeDelayError(Function> mapper) { + return concatMapMaybeDelayError(mapper, true, 2); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other terminates, emits their success value if available and optionally delaying all errors + * till both this {@code Flowable} and all inner {@code MaybeSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the + * inner {@code MaybeSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Flowable} is delayed until the current inner + * {@code MaybeSource} terminates and only then is + * it emitted to the downstream. + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybe(Function, int) + * @see #concatMapMaybeDelayError(Function, boolean, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapMaybeDelayError(Function> mapper, boolean tillTheEnd) { + return concatMapMaybeDelayError(mapper, tillTheEnd, 2); + } + + /** + * Maps the upstream items into {@link MaybeSource}s and subscribes to them one after the + * other terminates, emits their success value if available and optionally delaying all errors + * till both this {@code Flowable} and all inner {@code MaybeSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code MaybeSource}s + * @param mapper the function called with the upstream item and should return + * a {@code MaybeSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the + * inner {@code MaybeSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Flowable} is delayed until the current inner + * {@code MaybeSource} terminates and only then is + * it emitted to the downstream. + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code MaybeSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code MaybeSource}s. + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapMaybe(Function, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapMaybeDelayError(Function> mapper, boolean tillTheEnd, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableConcatMapMaybe(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, prefetch)); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds, emits their success values or terminates immediately if + * either this {@code Flowable} or the current inner {@code SingleSource} fail. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapSingleDelayError(Function) + * @see #concatMapSingle(Function, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapSingle(Function> mapper) { + return concatMapSingle(mapper, 2); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds, emits their success values or terminates immediately if + * either this {@code Flowable} or the current inner {@code SingleSource} fail. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code SingleSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code SingleSource}s. + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapSingle(Function) + * @see #concatMapSingleDelayError(Function, boolean, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapSingle(Function> mapper, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableConcatMapSingle(this, mapper, ErrorMode.IMMEDIATE, prefetch)); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds or fails, emits their success values and delays all errors + * till both this {@code Flowable} and all inner {@code SingleSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapSingle(Function) + * @see #concatMapSingleDelayError(Function, boolean) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapSingleDelayError(Function> mapper) { + return concatMapSingleDelayError(mapper, true, 2); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds or fails, emits their success values and optionally delays all errors + * till both this {@code Flowable} and all inner {@code SingleSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the + * inner {@code SingleSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Flowable} is delayed until the current inner + * {@code SingleSource} terminates and only then is + * it emitted to the downstream. + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapSingle(Function, int) + * @see #concatMapSingleDelayError(Function, boolean, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapSingleDelayError(Function> mapper, boolean tillTheEnd) { + return concatMapSingleDelayError(mapper, tillTheEnd, 2); + } + + /** + * Maps the upstream items into {@link SingleSource}s and subscribes to them one after the + * other succeeds or fails, emits their success values and optionally delays errors + * till both this {@code Flowable} and all inner {@code SingleSource}s terminate. + *

+ * + *

+ *
Backpressure:
+ *
The operator expects the upstream to support backpressure and honors + * the backpressure from downstream. If this {@code Flowable} violates the rule, the operator will + * signal a {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the result type of the inner {@code SingleSource}s + * @param mapper the function called with the upstream item and should return + * a {@code SingleSource} to become the next source to + * be subscribed to + * @param tillTheEnd If {@code true}, errors from this {@code Flowable} or any of the + * inner {@code SingleSource}s are delayed until all + * of them terminate. If {@code false}, an error from this + * {@code Flowable} is delayed until the current inner + * {@code SingleSource} terminates and only then is + * it emitted to the downstream. + * @param prefetch The number of upstream items to prefetch so that fresh items are + * ready to be mapped when a previous {@code SingleSource} terminates. + * The operator replenishes after half of the prefetch amount has been consumed + * and turned into {@code SingleSource}s. + * @return a new Flowable instance + * @since 2.1.11 - experimental + * @see #concatMapSingle(Function, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Flowable concatMapSingleDelayError(Function> mapper, boolean tillTheEnd, int prefetch) { + ObjectHelper.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableConcatMapSingle(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, prefetch)); + } + /** * Returns a Flowable that emits the items emitted from the current Publisher, then the next, one after * the other, without interleaving them. diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java new file mode 100644 index 0000000000..8910e0c10a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java @@ -0,0 +1,342 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps each upstream item into a {@link MaybeSource}, subscribes to them one after the other terminates + * and relays their success values, optionally delaying any errors till the main and inner sources + * terminate. + * + * @param the upstream element type + * @param the output element type + * + * @since 2.1.11 - experimental + */ +@Experimental +public final class FlowableConcatMapMaybe extends Flowable { + + final Flowable source; + + final Function> mapper; + + final ErrorMode errorMode; + + final int prefetch; + + public FlowableConcatMapMaybe(Flowable source, + Function> mapper, + ErrorMode errorMode, int prefetch) { + this.source = source; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new ConcatMapMaybeSubscriber(s, mapper, prefetch, errorMode)); + } + + static final class ConcatMapMaybeSubscriber + extends AtomicInteger + implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = -9140123220065488293L; + + final Subscriber downstream; + + final Function> mapper; + + final int prefetch; + + final AtomicLong requested; + + final AtomicThrowable errors; + + final ConcatMapMaybeObserver inner; + + final SimplePlainQueue queue; + + final ErrorMode errorMode; + + Subscription upstream; + + volatile boolean done; + + volatile boolean cancelled; + + long emitted; + + int consumed; + + R item; + + volatile int state; + + /** No inner MaybeSource is running. */ + static final int STATE_INACTIVE = 0; + /** An inner MaybeSource is running but there are no results yet. */ + static final int STATE_ACTIVE = 1; + /** The inner MaybeSource succeeded with a value in {@link #item}. */ + static final int STATE_RESULT_VALUE = 2; + + ConcatMapMaybeSubscriber(Subscriber downstream, + Function> mapper, + int prefetch, ErrorMode errorMode) { + this.downstream = downstream; + this.mapper = mapper; + this.prefetch = prefetch; + this.errorMode = errorMode; + this.requested = new AtomicLong(); + this.errors = new AtomicThrowable(); + this.inner = new ConcatMapMaybeObserver(this); + this.queue = new SpscArrayQueue(prefetch); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(upstream, s)) { + upstream = s; + downstream.onSubscribe(this); + s.request(prefetch); + } + } + + @Override + public void onNext(T t) { + if (!queue.offer(t)) { + upstream.cancel(); + onError(new MissingBackpressureException("queue full?!")); + return; + } + drain(); + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (errorMode == ErrorMode.IMMEDIATE) { + inner.dispose(); + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void request(long n) { + BackpressureHelper.add(requested, n); + drain(); + } + + @Override + public void cancel() { + cancelled = true; + upstream.cancel(); + inner.dispose(); + if (getAndIncrement() != 0) { + queue.clear(); + item = null; + } + } + + void innerSuccess(R item) { + this.item = item; + this.state = STATE_RESULT_VALUE; + drain(); + } + + void innerComplete() { + this.state = STATE_INACTIVE; + drain(); + } + + void innerError(Throwable ex) { + if (errors.addThrowable(ex)) { + if (errorMode != ErrorMode.END) { + upstream.cancel(); + } + this.state = STATE_INACTIVE; + drain(); + } else { + RxJavaPlugins.onError(ex); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Subscriber downstream = this.downstream; + ErrorMode errorMode = this.errorMode; + SimplePlainQueue queue = this.queue; + AtomicThrowable errors = this.errors; + AtomicLong requested = this.requested; + int limit = prefetch - (prefetch >> 1); + + for (;;) { + + for (;;) { + if (cancelled) { + queue.clear(); + item = null; + } + + int s = state; + + if (errors.get() != null) { + if (errorMode == ErrorMode.IMMEDIATE + || (errorMode == ErrorMode.BOUNDARY && s == STATE_INACTIVE)) { + queue.clear(); + item = null; + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + if (s == STATE_INACTIVE) { + boolean d = done; + T v = queue.poll(); + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + return; + } + + if (empty) { + break; + } + + int c = consumed + 1; + if (c == limit) { + consumed = 0; + upstream.request(limit); + } else { + consumed = c; + } + + MaybeSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + queue.clear(); + errors.addThrowable(ex); + ex = errors.terminate(); + downstream.onError(ex); + return; + } + + state = STATE_ACTIVE; + ms.subscribe(inner); + break; + } else if (s == STATE_RESULT_VALUE) { + long e = emitted; + if (e != requested.get()) { + R w = item; + item = null; + + downstream.onNext(w); + + emitted = e + 1; + state = STATE_INACTIVE; + } else { + break; + } + } else { + break; + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class ConcatMapMaybeObserver + extends AtomicReference + implements MaybeObserver { + + private static final long serialVersionUID = -3051469169682093892L; + + final ConcatMapMaybeSubscriber parent; + + ConcatMapMaybeObserver(ConcatMapMaybeSubscriber parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); + } + + @Override + public void onSuccess(R t) { + parent.innerSuccess(t); + } + + @Override + public void onError(Throwable e) { + parent.innerError(e); + } + + @Override + public void onComplete() { + parent.innerComplete(); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java new file mode 100644 index 0000000000..43223ec721 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java @@ -0,0 +1,332 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; +import io.reactivex.functions.Function; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimplePlainQueue; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Maps each upstream item into a {@link SingleSource}, subscribes to them one after the other terminates + * and relays their success values, optionally delaying any errors till the main and inner sources + * terminate. + * + * @param the upstream element type + * @param the output element type + * + * @since 2.1.11 - experimental + */ +@Experimental +public final class FlowableConcatMapSingle extends Flowable { + + final Flowable source; + + final Function> mapper; + + final ErrorMode errorMode; + + final int prefetch; + + public FlowableConcatMapSingle(Flowable source, + Function> mapper, + ErrorMode errorMode, int prefetch) { + this.source = source; + this.mapper = mapper; + this.errorMode = errorMode; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new ConcatMapSingleSubscriber(s, mapper, prefetch, errorMode)); + } + + static final class ConcatMapSingleSubscriber + extends AtomicInteger + implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = -9140123220065488293L; + + final Subscriber downstream; + + final Function> mapper; + + final int prefetch; + + final AtomicLong requested; + + final AtomicThrowable errors; + + final ConcatMapSingleObserver inner; + + final SimplePlainQueue queue; + + final ErrorMode errorMode; + + Subscription upstream; + + volatile boolean done; + + volatile boolean cancelled; + + long emitted; + + int consumed; + + R item; + + volatile int state; + + /** No inner SingleSource is running. */ + static final int STATE_INACTIVE = 0; + /** An inner SingleSource is running but there are no results yet. */ + static final int STATE_ACTIVE = 1; + /** The inner SingleSource succeeded with a value in {@link #item}. */ + static final int STATE_RESULT_VALUE = 2; + + ConcatMapSingleSubscriber(Subscriber downstream, + Function> mapper, + int prefetch, ErrorMode errorMode) { + this.downstream = downstream; + this.mapper = mapper; + this.prefetch = prefetch; + this.errorMode = errorMode; + this.requested = new AtomicLong(); + this.errors = new AtomicThrowable(); + this.inner = new ConcatMapSingleObserver(this); + this.queue = new SpscArrayQueue(prefetch); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(upstream, s)) { + upstream = s; + downstream.onSubscribe(this); + s.request(prefetch); + } + } + + @Override + public void onNext(T t) { + if (!queue.offer(t)) { + upstream.cancel(); + onError(new MissingBackpressureException("queue full?!")); + return; + } + drain(); + } + + @Override + public void onError(Throwable t) { + if (errors.addThrowable(t)) { + if (errorMode == ErrorMode.IMMEDIATE) { + inner.dispose(); + } + done = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + done = true; + drain(); + } + + @Override + public void request(long n) { + BackpressureHelper.add(requested, n); + drain(); + } + + @Override + public void cancel() { + cancelled = true; + upstream.cancel(); + inner.dispose(); + if (getAndIncrement() != 0) { + queue.clear(); + item = null; + } + } + + void innerSuccess(R item) { + this.item = item; + this.state = STATE_RESULT_VALUE; + drain(); + } + + void innerError(Throwable ex) { + if (errors.addThrowable(ex)) { + if (errorMode != ErrorMode.END) { + upstream.cancel(); + } + this.state = STATE_INACTIVE; + drain(); + } else { + RxJavaPlugins.onError(ex); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Subscriber downstream = this.downstream; + ErrorMode errorMode = this.errorMode; + SimplePlainQueue queue = this.queue; + AtomicThrowable errors = this.errors; + AtomicLong requested = this.requested; + int limit = prefetch - (prefetch >> 1); + + for (;;) { + + for (;;) { + if (cancelled) { + queue.clear(); + item = null; + } + + int s = state; + + if (errors.get() != null) { + if (errorMode == ErrorMode.IMMEDIATE + || (errorMode == ErrorMode.BOUNDARY && s == STATE_INACTIVE)) { + queue.clear(); + item = null; + Throwable ex = errors.terminate(); + downstream.onError(ex); + return; + } + } + + if (s == STATE_INACTIVE) { + boolean d = done; + T v = queue.poll(); + boolean empty = v == null; + + if (d && empty) { + Throwable ex = errors.terminate(); + if (ex == null) { + downstream.onComplete(); + } else { + downstream.onError(ex); + } + return; + } + + if (empty) { + break; + } + + int c = consumed + 1; + if (c == limit) { + consumed = 0; + upstream.request(limit); + } else { + consumed = c; + } + + SingleSource ms; + + try { + ms = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + queue.clear(); + errors.addThrowable(ex); + ex = errors.terminate(); + downstream.onError(ex); + return; + } + + state = STATE_ACTIVE; + ms.subscribe(inner); + break; + } else if (s == STATE_RESULT_VALUE) { + long e = emitted; + if (e != requested.get()) { + R w = item; + item = null; + + downstream.onNext(w); + + emitted = e + 1; + state = STATE_INACTIVE; + } else { + break; + } + } else { + break; + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + static final class ConcatMapSingleObserver + extends AtomicReference + implements SingleObserver { + + private static final long serialVersionUID = -3051469169682093892L; + + final ConcatMapSingleSubscriber parent; + + ConcatMapSingleObserver(ConcatMapSingleSubscriber parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); + } + + @Override + public void onSuccess(R t) { + parent.innerSuccess(t); + } + + @Override + public void onError(Throwable e) { + parent.innerError(e); + } + + void dispose() { + DisposableHelper.dispose(this); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java new file mode 100644 index 0000000000..6c393acebb --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java @@ -0,0 +1,371 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.MaybeSubject; +import io.reactivex.subscribers.TestSubscriber; + +public class FlowableConcatMapMaybeTest { + + @Test + public void simple() { + Flowable.range(1, 5) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void simpleLong() { + Flowable.range(1, 1024) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }, 32) + .test() + .assertValueCount(1024) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void backpressure() { + TestSubscriber ts = Flowable.range(1, 1024) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }, 32) + .test(0); + + for (int i = 1; i <= 1024; i++) { + ts.assertValueCount(i - 1) + .assertNoErrors() + .assertNotComplete() + .requestMore(1) + .assertValueCount(i) + .assertNoErrors(); + } + + ts.assertComplete(); + } + + @Test + public void empty() { + Flowable.range(1, 10) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }) + .test() + .assertResult(); + } + + @Test + public void mixed() { + Flowable.range(1, 10) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v % 2 == 0) { + return Maybe.just(v); + } + return Maybe.empty(); + } + }) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void mixedLong() { + Flowable.range(1, 1024) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + if (v % 2 == 0) { + return Maybe.just(v).subscribeOn(Schedulers.computation()); + } + return Maybe.empty().subscribeOn(Schedulers.computation()); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(512) + .assertNoErrors() + .assertComplete() + .assertOf(new Consumer>() { + @Override + public void accept(TestSubscriber ts) throws Exception { + for (int i = 0; i < 512; i ++) { + ts.assertValueAt(i, (i + 1) * 2); + } + } + }); + } + + @Test + public void mainError() { + Flowable.error(new TestException()) + .concatMapMaybe(Functions.justFunction(Maybe.just(1))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Flowable.just(1) + .concatMapMaybe(Functions.justFunction(Maybe.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mainBoundaryErrorInnerSuccess() { + PublishProcessor pp = PublishProcessor.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = pp.concatMapMaybeDelayError(Functions.justFunction(ms), false).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + assertTrue(ms.hasObservers()); + + pp.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void mainBoundaryErrorInnerEmpty() { + PublishProcessor pp = PublishProcessor.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = pp.concatMapMaybeDelayError(Functions.justFunction(ms), false).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + assertTrue(ms.hasObservers()); + + pp.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onComplete(); + + ts.assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable( + new Function, Flowable>() { + @Override + public Flowable apply(Flowable f) + throws Exception { + return f.concatMapMaybeDelayError( + Functions.justFunction(Maybe.empty())); + } + } + ); + } + + @Test + public void queueOverflow() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onError(new TestException()); + } + } + .concatMapMaybe( + Functions.justFunction(Maybe.never()), 1 + ) + .test() + .assertFailure(MissingBackpressureException.class); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void limit() { + Flowable.range(1, 5) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .limit(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void cancel() { + Flowable.range(1, 5) + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }) + .test(3) + .assertValues(1, 2, 3) + .assertNoErrors() + .assertNotComplete() + .cancel(); + } + + @Test + public void innerErrorAfterMainError() { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> obs = new AtomicReference>(); + + TestSubscriber ts = pp.concatMapMaybe( + new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return new Maybe() { + @Override + protected void subscribeActual( + MaybeObserver observer) { + observer.onSubscribe(Disposables.empty()); + obs.set(observer); + } + }; + } + } + ).test(); + + pp.onNext(1); + + pp.onError(new TestException("outer")); + obs.get().onError(new TestException("inner")); + + ts.assertFailureAndMessage(TestException.class, "outer"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void delayAllErrors() { + Flowable.range(1, 5) + .concatMapMaybeDelayError(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + return Maybe.error(new TestException()); + } + }) + .test() + .assertFailure(CompositeException.class) + .assertOf(new Consumer>() { + @Override + public void accept(TestSubscriber ts) throws Exception { + CompositeException ce = (CompositeException)ts.errors().get(0); + assertEquals(5, ce.getExceptions().size()); + } + }); + } + + @Test + public void mapperCrash() { + final PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .concatMapMaybe(new Function>() { + @Override + public MaybeSource apply(Integer v) + throws Exception { + throw new TestException(); + } + }) + .test(); + + ts.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts.assertFailure(TestException.class); + + assertFalse(pp.hasSubscribers()); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java new file mode 100644 index 0000000000..92779f725f --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java @@ -0,0 +1,286 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subjects.SingleSubject; +import io.reactivex.subscribers.TestSubscriber; + +public class FlowableConcatMapSingleTest { + + @Test + public void simple() { + Flowable.range(1, 5) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void simpleLong() { + Flowable.range(1, 1024) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }, 32) + .test() + .assertValueCount(1024) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void backpressure() { + TestSubscriber ts = Flowable.range(1, 1024) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }, 32) + .test(0); + + for (int i = 1; i <= 1024; i++) { + ts.assertValueCount(i - 1) + .assertNoErrors() + .assertNotComplete() + .requestMore(1) + .assertValueCount(i) + .assertNoErrors(); + } + + ts.assertComplete(); + } + + @Test + public void mainError() { + Flowable.error(new TestException()) + .concatMapSingle(Functions.justFunction(Single.just(1))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void innerError() { + Flowable.just(1) + .concatMapSingle(Functions.justFunction(Single.error(new TestException()))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mainBoundaryErrorInnerSuccess() { + PublishProcessor pp = PublishProcessor.create(); + SingleSubject ms = SingleSubject.create(); + + TestSubscriber ts = pp.concatMapSingleDelayError(Functions.justFunction(ms), false).test(); + + ts.assertEmpty(); + + pp.onNext(1); + + assertTrue(ms.hasObservers()); + + pp.onError(new TestException()); + + assertTrue(ms.hasObservers()); + + ts.assertEmpty(); + + ms.onSuccess(1); + + ts.assertFailure(TestException.class, 1); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable( + new Function, Flowable>() { + @Override + public Flowable apply(Flowable f) + throws Exception { + return f.concatMapSingleDelayError( + Functions.justFunction(Single.just((Object)1))); + } + } + ); + } + + @Test + public void queueOverflow() { + List errors = TestHelper.trackPluginErrors(); + try { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onError(new TestException()); + } + } + .concatMapSingle( + Functions.justFunction(Single.never()), 1 + ) + .test() + .assertFailure(MissingBackpressureException.class); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void limit() { + Flowable.range(1, 5) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .limit(3) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void cancel() { + Flowable.range(1, 5) + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.just(v); + } + }) + .test(3) + .assertValues(1, 2, 3) + .assertNoErrors() + .assertNotComplete() + .cancel(); + } + + @Test + public void innerErrorAfterMainError() { + List errors = TestHelper.trackPluginErrors(); + try { + final PublishProcessor pp = PublishProcessor.create(); + + final AtomicReference> obs = new AtomicReference>(); + + TestSubscriber ts = pp.concatMapSingle( + new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return new Single() { + @Override + protected void subscribeActual( + SingleObserver observer) { + observer.onSubscribe(Disposables.empty()); + obs.set(observer); + } + }; + } + } + ).test(); + + pp.onNext(1); + + pp.onError(new TestException("outer")); + obs.get().onError(new TestException("inner")); + + ts.assertFailureAndMessage(TestException.class, "outer"); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void delayAllErrors() { + Flowable.range(1, 5) + .concatMapSingleDelayError(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + return Single.error(new TestException()); + } + }) + .test() + .assertFailure(CompositeException.class) + .assertOf(new Consumer>() { + @Override + public void accept(TestSubscriber ts) throws Exception { + CompositeException ce = (CompositeException)ts.errors().get(0); + assertEquals(5, ce.getExceptions().size()); + } + }); + } + + @Test + public void mapperCrash() { + final PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .concatMapSingle(new Function>() { + @Override + public SingleSource apply(Integer v) + throws Exception { + throw new TestException(); + } + }) + .test(); + + ts.assertEmpty(); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts.assertFailure(TestException.class); + + assertFalse(pp.hasSubscribers()); + } +}