diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 4f5ced8de3..23c787a5c7 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -49,7 +49,7 @@ public abstract class Maybe implements MaybeSource { /** - * Runs multiple Maybe sources and signals the events of the first one that signals (cancelling + * Runs multiple MaybeSources and signals the events of the first one that signals (cancelling * the rest). *
*
Scheduler:
@@ -68,7 +68,7 @@ public static Maybe amb(final Iterable } /** - * Runs multiple Maybe sources and signals the events of the first one that signals (cancelling + * Runs multiple MaybeSources and signals the events of the first one that signals (cancelling * the rest). *
*
Scheduler:
@@ -412,7 +412,7 @@ public static Flowable concatEager(Iterable * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the * emitted source Publishers as they are observed. The operator buffers the values emitted by these diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index d18170b221..fd9ad9870b 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -60,7 +60,7 @@ public abstract class Single implements SingleSource { /** - * Runs multiple Single sources and signals the events of the first one that signals (cancelling + * Runs multiple SingleSources and signals the events of the first one that signals (cancelling * the rest). *
*
Scheduler:
@@ -80,7 +80,7 @@ public static Single amb(final Iterable *
Scheduler:
@@ -106,7 +106,7 @@ public static Single ambArray(final SingleSource... sources) } /** - * Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by + * Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by * an Iterable sequence. *
*
Backpressure:
@@ -127,7 +127,7 @@ public static Flowable concat(Iterable *
Scheduler:
@@ -147,7 +147,7 @@ public static Observable concat(ObservableSource *
Backpressure:
@@ -169,7 +169,7 @@ public static Flowable concat(Publisher *
Backpressure:
@@ -299,7 +299,7 @@ public static Flowable concat( } /** - * Concatenate the single values, in a non-overlapping fashion, of the Single sources provided in + * Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided in * an array. *
*
Backpressure:
@@ -320,6 +320,80 @@ public static Flowable concatArray(SingleSource... sources) return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY)); } + /** + * Concatenates a sequence of SingleSource eagerly into a single stream of values. + *

+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the + * source SingleSources. The operator buffers the value emitted by these SingleSources and then drains them + * in order, each one after the previous one completes. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources a sequence of Single that need to be eagerly concatenated + * @return the new Flowable instance with the specified concatenation behavior + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable concatArrayEager(SingleSource... sources) { + return Flowable.fromArray(sources).concatMapEager(SingleInternalHelper.toFlowable()); + } + + /** + * Concatenates a Publisher sequence of SingleSources eagerly into a single stream of values. + *

+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the + * emitted source Publishers as they are observed. The operator buffers the values emitted by these + * Publishers and then drains them in order, each one after the previous one completes. + *

+ *
Backpressure:
+ *
Backpressure is honored towards the downstream and the outer Publisher is + * expected to support backpressure. Violating this assumption, the operator will + * signal {@link io.reactivex.exceptions.MissingBackpressureException}.
+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources a sequence of Publishers that need to be eagerly concatenated + * @return the new Publisher instance with the specified concatenation behavior + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable concatEager(Publisher> sources) { + return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.toFlowable()); + } + + /** + * Concatenates a sequence of SingleSources eagerly into a single stream of values. + *

+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the + * source SingleSources. The operator buffers the values emitted by these SingleSources and then drains them + * in order, each one after the previous one completes. + *

+ *
Backpressure:
+ *
Backpressure is honored towards the downstream.
+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources a sequence of SingleSource that need to be eagerly concatenated + * @return the new Flowable instance with the specified concatenation behavior + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable concatEager(Iterable> sources) { + return Flowable.fromIterable(sources).concatMapEager(SingleInternalHelper.toFlowable()); + } + /** * Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world. *

diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java index 8068f9e5bd..6031eeeb9e 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java @@ -14,9 +14,12 @@ package io.reactivex.internal.operators.single; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.Arrays; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.subscribers.TestSubscriber; import org.junit.Test; import io.reactivex.*; @@ -67,6 +70,63 @@ public void concatArray() { } } + @SuppressWarnings("unchecked") + @Test + public void concatArrayEagerTest() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Single.concatArrayEager(pp1.single("1"), pp2.single("2")).test(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + ts.assertEmpty(); + pp1.onComplete(); + + ts.assertResult("1", "2"); + ts.assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + public void concatEagerIterableTest() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Single.concatEager(Arrays.asList(pp1.single("2"), pp2.single("1"))).test(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + ts.assertEmpty(); + pp1.onComplete(); + + ts.assertResult("2", "1"); + ts.assertComplete(); + } + + @SuppressWarnings("unchecked") + @Test + public void concatEagerPublisherTest() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Single.concatEager(Flowable.just(pp1.single("1"), pp2.single("2"))).test(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + + pp2.onComplete(); + ts.assertEmpty(); + pp1.onComplete(); + + ts.assertResult("1", "2"); + ts.assertComplete(); + } + @SuppressWarnings("unchecked") @Test public void concatObservable() {