Skip to content

Corrected buffer operator onError behaviour documentation #3561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3285,8 +3285,9 @@ public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<?
/**
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
* Observable emits connected, non-overlapping buffers, each containing {@code count} items. When the source
* Observable completes or encounters an error, the resulting Observable emits the current buffer and
* propagates the notification from the source Observable.
* Observable completes, the resulting Observable emits the current buffer and propagates the notification
* from the source Observable. Note that if the source Observable issues an onError notification
* the event is passed on immediately without first emitting the buffer it is in the process of assembling.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer3.png" alt="">
* <dl>
Expand All @@ -3307,8 +3308,9 @@ public final Observable<List<T>> buffer(int count) {
/**
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
* Observable emits buffers every {@code skip} items, each containing {@code count} items. When the source
* Observable completes or encounters an error, the resulting Observable emits the current buffer and
* propagates the notification from the source Observable.
* Observable completes, the resulting Observable emits the current buffer and propagates the notification
* from the source Observable. Note that if the source Observable issues an onError notification
* the event is passed on immediately without first emitting the buffer it is in the process of assembling.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer4.png" alt="">
* <dl>
Expand All @@ -3334,8 +3336,9 @@ public final Observable<List<T>> buffer(int count, int skip) {
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
* Observable starts a new buffer periodically, as determined by the {@code timeshift} argument. It emits
* each buffer after a fixed timespan, specified by the {@code timespan} argument. When the source
* Observable completes or encounters an error, the resulting Observable emits the current buffer and
* propagates the notification from the source Observable.
* Observable completes, the resulting Observable emits the current buffer and propagates the notification
* from the source Observable. Note that if the source Observable issues an onError notification
* the event is passed on immediately without first emitting the buffer it is in the process of assembling.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer7.png" alt="">
* <dl>
Expand Down Expand Up @@ -3364,8 +3367,10 @@ public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
* Observable starts a new buffer periodically, as determined by the {@code timeshift} argument, and on the
* specified {@code scheduler}. It emits each buffer after a fixed timespan, specified by the
* {@code timespan} argument. When the source Observable completes or encounters an error, the resulting
* Observable emits the current buffer and propagates the notification from the source Observable.
* {@code timespan} argument. When the source Observable completes, the resulting Observable emits the current
* buffer and propagates the notification from the source Observable. Note that if the source
* Observable issues an onError notification the event is passed on immediately without first emitting
* the buffer it is in the process of assembling.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer7.s.png" alt="">
* <dl>
Expand Down Expand Up @@ -3395,8 +3400,10 @@ public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit
/**
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
* Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
* {@code timespan} argument. When the source Observable completes or encounters an error, the resulting
* Observable emits the current buffer and propagates the notification from the source Observable.
* {@code timespan} argument. When the source Observable completes, the resulting Observable emits the current
* buffer and propagates the notification from the source Observable. Note that if the source
* Observable issues an onError notification the event is passed on immediately without first emitting
* the buffer it is in the process of assembling.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer5.png" alt="">
* <dl>
Expand Down Expand Up @@ -3424,8 +3431,10 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit) {
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
* Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
* {@code timespan} argument or a maximum size specified by the {@code count} argument (whichever is reached
* first). When the source Observable completes or encounters an error, the resulting Observable emits the
* current buffer and propagates the notification from the source Observable.
* first). When the source Observable completes, the resulting Observable emits the current buffer and propagates
* the notification from the source Observable. Note that if the source Observable issues an onError
* notification the event is passed on immediately without first emitting the buffer it is in the process
* of assembling.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer6.png" alt="">
* <dl>
Expand Down Expand Up @@ -3456,9 +3465,10 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
* Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
* {@code timespan} argument as measured on the specified {@code scheduler}, or a maximum size specified by
* the {@code count} argument (whichever is reached first). When the source Observable completes or
* encounters an error, the resulting Observable emits the current buffer and propagates the notification
* from the source Observable.
* the {@code count} argument (whichever is reached first). When the source Observable completes, the resulting
* Observable emits the current buffer and propagates the notification from the source Observable.
* Note that if the source Observable issues an onError notification the event is passed on immediately
* without first emitting the buffer it is in the process of assembling.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer6.s.png" alt="">
* <dl>
Expand Down Expand Up @@ -3490,9 +3500,10 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count,
/**
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
* Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
* {@code timespan} argument and on the specified {@code scheduler}. When the source Observable completes or
* encounters an error, the resulting Observable emits the current buffer and propagates the notification
* from the source Observable.
* {@code timespan} argument and on the specified {@code scheduler}. When the source Observable completes,
* the resulting Observable emits the current buffer and propagates the notification from the source Observable.
* Note that if the source Observable issues an onError notification the event is passed on immediately
* without first emitting the buffer it is in the process of assembling.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer5.s.png" alt="">
* <dl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
* {@link Observable} constructed using the {@link Func0} argument, produces a value. The buffer is then
* emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using
* the provided {@link Func0} object, which will determine when this new buffer is emitted. When the source
* {@link Observable} completes or produces an error, the current buffer is emitted, and the event is
* propagated to all subscribed {@link Observer}s.
* {@link Observable} completes, the current buffer is emitted, and the event is propagated to all
* subscribed {@link Observer}s. Note that if the source {@link Observable} issues an onError notification
* the event is passed on immediately without first emitting the buffer it is in the process of assembling.
* <p>
* Note that this operation only produces <strong>non-overlapping chunks</strong>. At all times there is
* exactly one buffer actively storing values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
* This operation takes
* values from the specified {@link Observable} source and stores them in all active chunks until the buffer
* contains a specified number of elements. The buffer is then emitted. Chunks are created after a certain
* amount of values have been received. When the source {@link Observable} completes or produces an error,
* the currently active chunks are emitted, and the event is propagated to all subscribed {@link Subscriber}s.
* amount of values have been received. When the source {@link Observable} completes, the current buffer is
* emitted, and the event is propagated to all subscribed {@link Subscriber}s. Note that if the source
* {@link Observable} issues an onError notification the event is passed on immediately without first emitting
* the buffer it is in the process of assembling.
* <p>
* Note that this operation can produce <strong>non-connected, connected non-overlapping, or overlapping
* chunks</strong> depending on the input parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
* {@link Observable}. This creates a new buffer which will then start recording values which are produced
* by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an
* {@link Observable} which can produce values. When it does so it will close this (and only this) newly
* created buffer. When the source {@link Observable} completes or produces an error, all chunks are
* emitted, and the event is propagated to all subscribed {@link Observer}s.
* created buffer. When the source {@link Observable} completes, the current buffer is emitted, and the
* event is propagated to all subscribed {@link Observer}s. Note that if the source {@link Observable}
* issues an onError notification the event is passed on immediately without first emitting the buffer
* it is in the process of assembling.
* </p><p>
* Note that when using this operation <strong>multiple overlapping chunks</strong> could be active at any
* one point.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
* values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer
* is emitted and replaced with a new buffer. How often this is done depends on the specified timespan.
* The creation of chunks is also periodical. How often this is done depends on the specified timeshift.
* When the source {@link Observable} completes or produces an error, the current buffer is emitted, and
* the event is propagated to all subscribed {@link Subscriber}s.
* When the source {@link Observable} completes, the current buffer is emitted, and the event is propagated
* to all subscribed {@link Subscriber}s. Note that if the source {@link Observable} issues an onError
* notification the event is passed on immediately without first emitting the buffer it is in the process
* of assembling.
* <p>
* Note that this operation can produce <strong>non-connected, or overlapping chunks</strong> depending
* on the input parameters.
Expand Down