diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 1afe7b594f..6af68db61d 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -17,6 +17,7 @@ import org.reactivestreams.*; +import io.reactivex.Observable; import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; @@ -1415,7 +1416,9 @@ public static Flowable concatArrayDelayError(Publisher... so } /** - * Concatenates a sequence of Publishers eagerly into a single stream of values. + * Concatenates an array of Publishers eagerly into a single stream of values. + *

+ * *

* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the * source Publishers. The operator buffers the values emitted by these Publishers and then drains them @@ -1430,7 +1433,7 @@ public static Flowable concatArrayDelayError(Publisher... so *

This method does not operate by default on a particular {@link Scheduler}.
* * @param the value type - * @param sources a sequence of Publishers that need to be eagerly concatenated + * @param sources an array of Publishers that need to be eagerly concatenated * @return the new Publisher instance with the specified concatenation behavior * @since 2.0 */ @@ -1442,7 +1445,9 @@ public static Flowable concatArrayEager(Publisher... sources } /** - * Concatenates a sequence of Publishers eagerly into a single stream of values. + * Concatenates an array of Publishers eagerly into a single stream of values. + *

+ * *

* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the * source Publishers. The operator buffers the values emitted by these Publishers and then drains them @@ -1457,7 +1462,7 @@ public static Flowable concatArrayEager(Publisher... sources *

This method does not operate by default on a particular {@link Scheduler}.
* * @param the value type - * @param sources a sequence of Publishers that need to be eagerly concatenated + * @param sources an array of Publishers that need to be eagerly concatenated * @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE * is interpreted as an indication to subscribe to all sources at once * @param prefetch the number of elements to prefetch from each Publisher source @@ -1475,6 +1480,70 @@ public static Flowable concatArrayEager(int maxConcurrency, int prefetch, return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromArray(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE)); } + /** + * Concatenates an array of {@link Publisher}s eagerly into a single stream of values + * and delaying any errors until all sources terminate. + *

+ * + *

+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the + * source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s + * and then drains them in order, each one after the previous one completes. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream. The {@code Publisher} + * sources are expected to honor backpressure as well. + * If any of the source {@code Publisher}s violate this, the operator will signal a + * {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources an array of {@code Publisher}s that need to be eagerly concatenated + * @return the new Flowable instance with the specified concatenation behavior + * @since 2.2.1 - experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public static Flowable concatArrayEagerDelayError(Publisher... sources) { + return concatArrayEagerDelayError(bufferSize(), bufferSize(), sources); + } + + /** + * Concatenates an array of {@link Publisher}s eagerly into a single stream of values + * and delaying any errors until all sources terminate. + *

+ * + *

+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the + * source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s + * and then drains them in order, each one after the previous one completes. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream. The {@code Publisher} + * sources are expected to honor backpressure as well. + * If any of the source {@code Publisher}s violate this, the operator will signal a + * {@code MissingBackpressureException}.
+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources an array of {@code Publisher}s that need to be eagerly concatenated + * @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE + * is interpreted as indication to subscribe to all sources at once + * @param prefetch the number of elements to prefetch from each {@code Publisher} source + * @return the new Flowable instance with the specified concatenation behavior + * @since 2.2.1 - experimental + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public static Flowable concatArrayEagerDelayError(int maxConcurrency, int prefetch, Publisher... sources) { + return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, true); + } + /** * Concatenates the Iterable sequence of Publishers into a single sequence by subscribing to each Publisher, * one after the other, one at a time and delays any errors till the all inner Publishers terminate. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index ab38ab2374..2c43e0abf0 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -1294,19 +1294,19 @@ public static Observable concatArrayDelayError(ObservableSource + * *

* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the * source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them * in order, each one after the previous one completes. - *

- * *

*
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
*
* @param the value type - * @param sources a sequence of ObservableSources that need to be eagerly concatenated + * @param sources an array of ObservableSources that need to be eagerly concatenated * @return the new ObservableSource instance with the specified concatenation behavior * @since 2.0 */ @@ -1317,7 +1317,9 @@ public static Observable concatArrayEager(ObservableSource.. } /** - * Concatenates a sequence of ObservableSources eagerly into a single stream of values. + * Concatenates an array of ObservableSources eagerly into a single stream of values. + *

+ * *

* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the * source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them @@ -1327,7 +1329,7 @@ public static Observable concatArrayEager(ObservableSource.. *

This method does not operate by default on a particular {@link Scheduler}.
* * @param the value type - * @param sources a sequence of ObservableSources that need to be eagerly concatenated + * @param sources an array of ObservableSources that need to be eagerly concatenated * @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE * is interpreted as indication to subscribe to all sources at once * @param prefetch the number of elements to prefetch from each ObservableSource source @@ -1341,6 +1343,58 @@ public static Observable concatArrayEager(int maxConcurrency, int prefetc return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, false); } + /** + * Concatenates an array of {@link ObservableSource}s eagerly into a single stream of values + * and delaying any errors until all sources terminate. + *

+ * + *

+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the + * source {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s + * and then drains them in order, each one after the previous one completes. + *

+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources an array of {@code ObservableSource}s that need to be eagerly concatenated + * @return the new Observable instance with the specified concatenation behavior + * @since 2.2.1 - experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable concatArrayEagerDelayError(ObservableSource... sources) { + return concatArrayEagerDelayError(bufferSize(), bufferSize(), sources); + } + + /** + * Concatenates an array of {@link ObservableSource}s eagerly into a single stream of values + * and delaying any errors until all sources terminate. + *

+ * + *

+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the + * source {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s + * and then drains them in order, each one after the previous one completes. + *

+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type + * @param sources an array of {@code ObservableSource}s that need to be eagerly concatenated + * @param maxConcurrency the maximum number of concurrent subscriptions at a time, Integer.MAX_VALUE + * is interpreted as indication to subscribe to all sources at once + * @param prefetch the number of elements to prefetch from each {@code ObservableSource} source + * @return the new Observable instance with the specified concatenation behavior + * @since 2.2.1 - experimental + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public static Observable concatArrayEagerDelayError(int maxConcurrency, int prefetch, ObservableSource... sources) { + return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, true); + } + /** * Concatenates the Iterable sequence of ObservableSources into a single sequence by subscribing to each ObservableSource, * one after the other, one at a time and delays any errors till the all inner ObservableSources terminate. diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java index 39a9427181..3c77acb502 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java @@ -1228,4 +1228,110 @@ public void accept(List v) .awaitDone(5, TimeUnit.SECONDS) .assertResult(list); } + + @Test + public void arrayDelayErrorDefault() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor pp3 = PublishProcessor.create(); + + @SuppressWarnings("unchecked") + TestSubscriber ts = Flowable.concatArrayEagerDelayError(pp1, pp2, pp3) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + assertTrue(pp3.hasSubscribers()); + + pp2.onNext(2); + pp2.onComplete(); + + ts.assertEmpty(); + + pp1.onNext(1); + + ts.assertValuesOnly(1); + + pp1.onComplete(); + + ts.assertValuesOnly(1, 2); + + pp3.onComplete(); + + ts.assertResult(1, 2); + } + + @Test + public void arrayDelayErrorMaxConcurrency() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor pp3 = PublishProcessor.create(); + + @SuppressWarnings("unchecked") + TestSubscriber ts = Flowable.concatArrayEagerDelayError(2, 2, pp1, pp2, pp3) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + assertFalse(pp3.hasSubscribers()); + + pp2.onNext(2); + pp2.onComplete(); + + ts.assertEmpty(); + + pp1.onNext(1); + + ts.assertValuesOnly(1); + + pp1.onComplete(); + + assertTrue(pp3.hasSubscribers()); + + ts.assertValuesOnly(1, 2); + + pp3.onComplete(); + + ts.assertResult(1, 2); + } + + @Test + public void arrayDelayErrorMaxConcurrencyErrorDelayed() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor pp3 = PublishProcessor.create(); + + @SuppressWarnings("unchecked") + TestSubscriber ts = Flowable.concatArrayEagerDelayError(2, 2, pp1, pp2, pp3) + .test(); + + ts.assertEmpty(); + + assertTrue(pp1.hasSubscribers()); + assertTrue(pp2.hasSubscribers()); + assertFalse(pp3.hasSubscribers()); + + pp2.onNext(2); + pp2.onError(new TestException()); + + ts.assertEmpty(); + + pp1.onNext(1); + + ts.assertValuesOnly(1); + + pp1.onComplete(); + + assertTrue(pp3.hasSubscribers()); + + ts.assertValuesOnly(1, 2); + + pp3.onComplete(); + + ts.assertFailure(TestException.class, 1, 2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java index 8874eec137..0f6920ecb3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java @@ -1036,4 +1036,110 @@ public void accept(List v) .awaitDone(5, TimeUnit.SECONDS) .assertResult(list); } + + @Test + public void arrayDelayErrorDefault() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + PublishSubject ps3 = PublishSubject.create(); + + @SuppressWarnings("unchecked") + TestObserver to = Observable.concatArrayEagerDelayError(ps1, ps2, ps3) + .test(); + + to.assertEmpty(); + + assertTrue(ps1.hasObservers()); + assertTrue(ps2.hasObservers()); + assertTrue(ps3.hasObservers()); + + ps2.onNext(2); + ps2.onComplete(); + + to.assertEmpty(); + + ps1.onNext(1); + + to.assertValuesOnly(1); + + ps1.onComplete(); + + to.assertValuesOnly(1, 2); + + ps3.onComplete(); + + to.assertResult(1, 2); + } + + @Test + public void arrayDelayErrorMaxConcurrency() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + PublishSubject ps3 = PublishSubject.create(); + + @SuppressWarnings("unchecked") + TestObserver to = Observable.concatArrayEagerDelayError(2, 2, ps1, ps2, ps3) + .test(); + + to.assertEmpty(); + + assertTrue(ps1.hasObservers()); + assertTrue(ps2.hasObservers()); + assertFalse(ps3.hasObservers()); + + ps2.onNext(2); + ps2.onComplete(); + + to.assertEmpty(); + + ps1.onNext(1); + + to.assertValuesOnly(1); + + ps1.onComplete(); + + assertTrue(ps3.hasObservers()); + + to.assertValuesOnly(1, 2); + + ps3.onComplete(); + + to.assertResult(1, 2); + } + + @Test + public void arrayDelayErrorMaxConcurrencyErrorDelayed() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + PublishSubject ps3 = PublishSubject.create(); + + @SuppressWarnings("unchecked") + TestObserver to = Observable.concatArrayEagerDelayError(2, 2, ps1, ps2, ps3) + .test(); + + to.assertEmpty(); + + assertTrue(ps1.hasObservers()); + assertTrue(ps2.hasObservers()); + assertFalse(ps3.hasObservers()); + + ps2.onNext(2); + ps2.onError(new TestException()); + + to.assertEmpty(); + + ps1.onNext(1); + + to.assertValuesOnly(1); + + ps1.onComplete(); + + assertTrue(ps3.hasObservers()); + + to.assertValuesOnly(1, 2); + + ps3.onComplete(); + + to.assertFailure(TestException.class, 1, 2); + } } diff --git a/src/test/java/io/reactivex/validators/JavadocForAnnotations.java b/src/test/java/io/reactivex/validators/JavadocForAnnotations.java index 230a622de7..9dae922016 100644 --- a/src/test/java/io/reactivex/validators/JavadocForAnnotations.java +++ b/src/test/java/io/reactivex/validators/JavadocForAnnotations.java @@ -129,8 +129,10 @@ static final void scanForBadMethod(StringBuilder sourceCode, String annotation, if ((ll < 0 || ll > idx) && (lm < 0 || lm > idx)) { int n = sourceCode.indexOf("{@code ", k); + int endDD = sourceCode.indexOf("", k); + // make sure the {@code is within the dt/dd section - if (n < idx) { + if (n < idx && n < endDD) { int m = sourceCode.indexOf("}", n); if (m < idx) {