diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index a0d93ccacf..b76406dcfa 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3285,8 +3285,9 @@ public final Observable> buffer(Func0 * *
@@ -3307,8 +3308,9 @@ public final Observable> buffer(int count) { /** * Returns an Observable that emits buffers of items it collects from the source Observable. The resulting * Observable emits buffers every {@code skip} items, each containing {@code count} items. When the source - * Observable completes or encounters an error, the resulting Observable emits the current buffer and - * propagates the notification from the source Observable. + * Observable completes, the resulting Observable emits the current buffer and propagates the notification + * from the source Observable. Note that if the source Observable issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -3334,8 +3336,9 @@ public final Observable> buffer(int count, int skip) { * Returns an Observable that emits buffers of items it collects from the source Observable. The resulting * Observable starts a new buffer periodically, as determined by the {@code timeshift} argument. It emits * each buffer after a fixed timespan, specified by the {@code timespan} argument. When the source - * Observable completes or encounters an error, the resulting Observable emits the current buffer and - * propagates the notification from the source Observable. + * Observable completes, the resulting Observable emits the current buffer and propagates the notification + * from the source Observable. Note that if the source Observable issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -3364,8 +3367,10 @@ public final Observable> buffer(long timespan, long timeshift, TimeUnit * Returns an Observable that emits buffers of items it collects from the source Observable. The resulting * Observable starts a new buffer periodically, as determined by the {@code timeshift} argument, and on the * specified {@code scheduler}. It emits each buffer after a fixed timespan, specified by the - * {@code timespan} argument. When the source Observable completes or encounters an error, the resulting - * Observable emits the current buffer and propagates the notification from the source Observable. + * {@code timespan} argument. When the source Observable completes, the resulting Observable emits the current + * buffer and propagates the notification from the source Observable. Note that if the source + * Observable issues an onError notification the event is passed on immediately without first emitting + * the buffer it is in the process of assembling. *

* *

@@ -3395,8 +3400,10 @@ public final Observable> buffer(long timespan, long timeshift, TimeUnit /** * Returns an Observable that emits buffers of items it collects from the source Observable. The resulting * Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the - * {@code timespan} argument. When the source Observable completes or encounters an error, the resulting - * Observable emits the current buffer and propagates the notification from the source Observable. + * {@code timespan} argument. When the source Observable completes, the resulting Observable emits the current + * buffer and propagates the notification from the source Observable. Note that if the source + * Observable issues an onError notification the event is passed on immediately without first emitting + * the buffer it is in the process of assembling. *

* *

@@ -3424,8 +3431,10 @@ public final Observable> buffer(long timespan, TimeUnit unit) { * Returns an Observable that emits buffers of items it collects from the source Observable. The resulting * Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the * {@code timespan} argument or a maximum size specified by the {@code count} argument (whichever is reached - * first). When the source Observable completes or encounters an error, the resulting Observable emits the - * current buffer and propagates the notification from the source Observable. + * first). When the source Observable completes, the resulting Observable emits the current buffer and propagates + * the notification from the source Observable. Note that if the source Observable issues an onError + * notification the event is passed on immediately without first emitting the buffer it is in the process + * of assembling. *

* *

@@ -3456,9 +3465,10 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count) * Returns an Observable that emits buffers of items it collects from the source Observable. The resulting * Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the * {@code timespan} argument as measured on the specified {@code scheduler}, or a maximum size specified by - * the {@code count} argument (whichever is reached first). When the source Observable completes or - * encounters an error, the resulting Observable emits the current buffer and propagates the notification - * from the source Observable. + * the {@code count} argument (whichever is reached first). When the source Observable completes, the resulting + * Observable emits the current buffer and propagates the notification from the source Observable. + * Note that if the source Observable issues an onError notification the event is passed on immediately + * without first emitting the buffer it is in the process of assembling. *

* *

@@ -3490,9 +3500,10 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count, /** * Returns an Observable that emits buffers of items it collects from the source Observable. The resulting * Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the - * {@code timespan} argument and on the specified {@code scheduler}. When the source Observable completes or - * encounters an error, the resulting Observable emits the current buffer and propagates the notification - * from the source Observable. + * {@code timespan} argument and on the specified {@code scheduler}. When the source Observable completes, + * the resulting Observable emits the current buffer and propagates the notification from the source Observable. + * Note that if the source Observable issues an onError notification the event is passed on immediately + * without first emitting the buffer it is in the process of assembling. *

* *

diff --git a/src/main/java/rx/internal/operators/OperatorBufferWithSingleObservable.java b/src/main/java/rx/internal/operators/OperatorBufferWithSingleObservable.java index 187bc0494a..cf9dcdbef4 100644 --- a/src/main/java/rx/internal/operators/OperatorBufferWithSingleObservable.java +++ b/src/main/java/rx/internal/operators/OperatorBufferWithSingleObservable.java @@ -33,8 +33,9 @@ * {@link Observable} constructed using the {@link Func0} argument, produces a value. The buffer is then * emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using * the provided {@link Func0} object, which will determine when this new buffer is emitted. When the source - * {@link Observable} completes or produces an error, the current buffer is emitted, and the event is - * propagated to all subscribed {@link Observer}s. + * {@link Observable} completes, the current buffer is emitted, and the event is propagated to all + * subscribed {@link Observer}s. Note that if the source {@link Observable} issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* Note that this operation only produces non-overlapping chunks. At all times there is * exactly one buffer actively storing values. diff --git a/src/main/java/rx/internal/operators/OperatorBufferWithSize.java b/src/main/java/rx/internal/operators/OperatorBufferWithSize.java index d0bfdb1dbb..7895cf90c6 100644 --- a/src/main/java/rx/internal/operators/OperatorBufferWithSize.java +++ b/src/main/java/rx/internal/operators/OperatorBufferWithSize.java @@ -30,8 +30,10 @@ * This operation takes * values from the specified {@link Observable} source and stores them in all active chunks until the buffer * contains a specified number of elements. The buffer is then emitted. Chunks are created after a certain - * amount of values have been received. When the source {@link Observable} completes or produces an error, - * the currently active chunks are emitted, and the event is propagated to all subscribed {@link Subscriber}s. + * amount of values have been received. When the source {@link Observable} completes, the current buffer is + * emitted, and the event is propagated to all subscribed {@link Subscriber}s. Note that if the source + * {@link Observable} issues an onError notification the event is passed on immediately without first emitting + * the buffer it is in the process of assembling. *

* Note that this operation can produce non-connected, connected non-overlapping, or overlapping * chunks depending on the input parameters. diff --git a/src/main/java/rx/internal/operators/OperatorBufferWithStartEndObservable.java b/src/main/java/rx/internal/operators/OperatorBufferWithStartEndObservable.java index 328f401a2e..23a43b9c96 100644 --- a/src/main/java/rx/internal/operators/OperatorBufferWithStartEndObservable.java +++ b/src/main/java/rx/internal/operators/OperatorBufferWithStartEndObservable.java @@ -37,8 +37,10 @@ * {@link Observable}. This creates a new buffer which will then start recording values which are produced * by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an * {@link Observable} which can produce values. When it does so it will close this (and only this) newly - * created buffer. When the source {@link Observable} completes or produces an error, all chunks are - * emitted, and the event is propagated to all subscribed {@link Observer}s. + * created buffer. When the source {@link Observable} completes, the current buffer is emitted, and the + * event is propagated to all subscribed {@link Observer}s. Note that if the source {@link Observable} + * issues an onError notification the event is passed on immediately without first emitting the buffer + * it is in the process of assembling. *

* Note that when using this operation multiple overlapping chunks could be active at any * one point. diff --git a/src/main/java/rx/internal/operators/OperatorBufferWithTime.java b/src/main/java/rx/internal/operators/OperatorBufferWithTime.java index bbb723d2b3..ba06b94a3b 100644 --- a/src/main/java/rx/internal/operators/OperatorBufferWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorBufferWithTime.java @@ -34,8 +34,10 @@ * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan. * The creation of chunks is also periodical. How often this is done depends on the specified timeshift. - * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and - * the event is propagated to all subscribed {@link Subscriber}s. + * When the source {@link Observable} completes, the current buffer is emitted, and the event is propagated + * to all subscribed {@link Subscriber}s. Note that if the source {@link Observable} issues an onError + * notification the event is passed on immediately without first emitting the buffer it is in the process + * of assembling. *

* Note that this operation can produce non-connected, or overlapping chunks depending * on the input parameters.