diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 0e63ea2f1a..a8e851ba1a 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -8250,6 +8250,25 @@ public final Flowable delaySubscription(long delay, TimeUnit unit, Scheduler * represent. *

* + *

+ * When the upstream signals an {@link Notification#createOnError(Throwable) onError} or + * {@link Notification#createOnComplete() onComplete} item, the + * returned Flowable cancels the flow and terminates with that type of terminal event: + *


+     * Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
+     * .doOnCancel(() -> System.out.println("Cancelled!"));
+     * .test()
+     * .assertResult(1);
+     * 
+ * If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated + * with the same event. + *

+     * Flowable.just(createOnNext(1), createOnNext(2))
+     * .test()
+     * .assertResult(1, 2);
+     * 
+ * If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(Publisher)} + * with a {@link #never()} source. *
*
Backpressure:
*
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 423bcef036..579ff6d9f9 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -7460,6 +7460,25 @@ public final Observable delaySubscription(long delay, TimeUnit unit, Schedule * represent. *

* + *

+ * When the upstream signals an {@link Notification#createOnError(Throwable) onError} or + * {@link Notification#createOnComplete() onComplete} item, the + * returned Observable cancels the flow and terminates with that type of terminal event: + *


+     * Observable.just(createOnNext(1), createOnComplete(), createOnNext(2))
+     * .doOnCancel(() -> System.out.println("Cancelled!"));
+     * .test()
+     * .assertResult(1);
+     * 
+ * If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated + * with the same event. + *

+     * Observable.just(createOnNext(1), createOnNext(2))
+     * .test()
+     * .assertResult(1, 2);
+     * 
+ * If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(ObservableSource)} + * with a {@link #never()} source. *
*
Scheduler:
*
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.