Skip to content

Commit d4a98e2

Browse files
author
Your Name
committed
Corrected buffer operator onError behaviour documentation
1 parent aef6b96 commit d4a98e2

File tree

5 files changed

+44
-26
lines changed

5 files changed

+44
-26
lines changed

src/main/java/rx/Observable.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3285,8 +3285,9 @@ public final <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<?
32853285
/**
32863286
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
32873287
* Observable emits connected, non-overlapping buffers, each containing {@code count} items. When the source
3288-
* Observable completes or encounters an error, the resulting Observable emits the current buffer and
3289-
* propagates the notification from the source Observable.
3288+
* Observable completes, the current buffer is emitted, and the event is propagated to all subscribed
3289+
* {@link Observer}s. Note that if the source {@link Observable} issues an onError notification the event
3290+
* is passed on immediately without first emitting the buffer it is in the process of assembling.
32903291
* <p>
32913292
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer3.png" alt="">
32923293
* <dl>
@@ -3307,8 +3308,9 @@ public final Observable<List<T>> buffer(int count) {
33073308
/**
33083309
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
33093310
* Observable emits buffers every {@code skip} items, each containing {@code count} items. When the source
3310-
* Observable completes or encounters an error, the resulting Observable emits the current buffer and
3311-
* propagates the notification from the source Observable.
3311+
* Observable completes, the current buffer is emitted, and the event is propagated to all subscribed
3312+
* {@link Observer}s. Note that if the source {@link Observable} issues an onError notification the event
3313+
* is passed on immediately without first emitting the buffer it is in the process of assembling.
33123314
* <p>
33133315
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer4.png" alt="">
33143316
* <dl>
@@ -3334,8 +3336,9 @@ public final Observable<List<T>> buffer(int count, int skip) {
33343336
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
33353337
* Observable starts a new buffer periodically, as determined by the {@code timeshift} argument. It emits
33363338
* each buffer after a fixed timespan, specified by the {@code timespan} argument. When the source
3337-
* Observable completes or encounters an error, the resulting Observable emits the current buffer and
3338-
* propagates the notification from the source Observable.
3339+
* Observable completes, the current buffer is emitted, and the event is propagated to all subscribed
3340+
* {@link Observer}s. Note that if the source {@link Observable} issues an onError notification the event
3341+
* is passed on immediately without first emitting the buffer it is in the process of assembling.
33393342
* <p>
33403343
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer7.png" alt="">
33413344
* <dl>
@@ -3364,8 +3367,10 @@ public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit
33643367
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
33653368
* Observable starts a new buffer periodically, as determined by the {@code timeshift} argument, and on the
33663369
* specified {@code scheduler}. It emits each buffer after a fixed timespan, specified by the
3367-
* {@code timespan} argument. When the source Observable completes or encounters an error, the resulting
3368-
* Observable emits the current buffer and propagates the notification from the source Observable.
3370+
* {@code timespan} argument. When the source Observable completes completes, the current buffer is emitted,
3371+
* and the event is propagated to all subscribed {@link Observer}s. Note that if the source {@link Observable}
3372+
* issues an onError notification the event is passed on immediately without first emitting the buffer it is
3373+
* in the process of assembling.
33693374
* <p>
33703375
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer7.s.png" alt="">
33713376
* <dl>
@@ -3395,8 +3400,10 @@ public final Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit
33953400
/**
33963401
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
33973402
* Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
3398-
* {@code timespan} argument. When the source Observable completes or encounters an error, the resulting
3399-
* Observable emits the current buffer and propagates the notification from the source Observable.
3403+
* {@code timespan} argument. When the source Observable completes, the current buffer is emitted,
3404+
* and the event is propagated to all subscribed {@link Observer}s. Note that if the source {@link Observable}
3405+
* issues an onError notification the event is passed on immediately without first emitting the buffer it is
3406+
* in the process of assembling.
34003407
* <p>
34013408
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer5.png" alt="">
34023409
* <dl>
@@ -3424,8 +3431,10 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit) {
34243431
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
34253432
* Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
34263433
* {@code timespan} argument or a maximum size specified by the {@code count} argument (whichever is reached
3427-
* first). When the source Observable completes or encounters an error, the resulting Observable emits the
3428-
* current buffer and propagates the notification from the source Observable.
3434+
* first). When the source Observable completes, the current buffer is emitted, and the event is propagated
3435+
* to all subscribed {@link Observer}s. Note that if the source {@link Observable} issues an onError
3436+
* notification the event is passed on immediately without first emitting the buffer it is in the process
3437+
* of assembling.
34293438
* <p>
34303439
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer6.png" alt="">
34313440
* <dl>
@@ -3456,9 +3465,10 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
34563465
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
34573466
* Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
34583467
* {@code timespan} argument as measured on the specified {@code scheduler}, or a maximum size specified by
3459-
* the {@code count} argument (whichever is reached first). When the source Observable completes or
3460-
* encounters an error, the resulting Observable emits the current buffer and propagates the notification
3461-
* from the source Observable.
3468+
* the {@code count} argument (whichever is reached first). When the source Observable completes, the
3469+
* current buffer is emitted, and the event is propagated to all subscribed {@link Observer}s. Note that
3470+
* if the source {@link Observable} issues an onError notification the event is passed on immediately
3471+
* without first emitting the buffer it is in the process of assembling.
34623472
* <p>
34633473
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer6.s.png" alt="">
34643474
* <dl>
@@ -3490,9 +3500,10 @@ public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count,
34903500
/**
34913501
* Returns an Observable that emits buffers of items it collects from the source Observable. The resulting
34923502
* Observable emits connected, non-overlapping buffers, each of a fixed duration specified by the
3493-
* {@code timespan} argument and on the specified {@code scheduler}. When the source Observable completes or
3494-
* encounters an error, the resulting Observable emits the current buffer and propagates the notification
3495-
* from the source Observable.
3503+
* {@code timespan} argument and on the specified {@code scheduler}. When the source Observable completes,
3504+
* the current buffer is emitted, and the event is propagated to all subscribed {@link Observer}s. Note
3505+
* that if the source {@link Observable} issues an onError notification the event is passed on immediately
3506+
* without first emitting the buffer it is in the process of assembling.
34963507
* <p>
34973508
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer5.s.png" alt="">
34983509
* <dl>

src/main/java/rx/internal/operators/OperatorBufferWithSingleObservable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@
3333
* {@link Observable} constructed using the {@link Func0} argument, produces a value. The buffer is then
3434
* emitted, and a new buffer is created to replace it. A new {@link Observable} will be constructed using
3535
* the provided {@link Func0} object, which will determine when this new buffer is emitted. When the source
36-
* {@link Observable} completes or produces an error, the current buffer is emitted, and the event is
37-
* propagated to all subscribed {@link Observer}s.
36+
* {@link Observable} completes, the current buffer is emitted, and the event is propagated to all
37+
* subscribed {@link Observer}s. Note that if the source {@link Observable} issues an onError notification
38+
* the event is passed on immediately without first emitting the buffer it is in the process of assembling.
3839
* <p>
3940
* Note that this operation only produces <strong>non-overlapping chunks</strong>. At all times there is
4041
* exactly one buffer actively storing values.

src/main/java/rx/internal/operators/OperatorBufferWithSize.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
* This operation takes
3131
* values from the specified {@link Observable} source and stores them in all active chunks until the buffer
3232
* contains a specified number of elements. The buffer is then emitted. Chunks are created after a certain
33-
* amount of values have been received. When the source {@link Observable} completes or produces an error,
34-
* the currently active chunks are emitted, and the event is propagated to all subscribed {@link Subscriber}s.
33+
* amount of values have been received. When the source {@link Observable} completes, the current buffer is
34+
* emitted, and the event is propagated to all subscribed {@link Observer}s. Note that if the source
35+
* {@link Observable} issues an onError notification the event is passed on immediately without first emitting
36+
* the buffer it is in the process of assembling.
3537
* <p>
3638
* Note that this operation can produce <strong>non-connected, connected non-overlapping, or overlapping
3739
* chunks</strong> depending on the input parameters.

src/main/java/rx/internal/operators/OperatorBufferWithStartEndObservable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@
3737
* {@link Observable}. This creates a new buffer which will then start recording values which are produced
3838
* by the "source" {@link Observable}. Additionally the "bufferClosingSelector" will be used to construct an
3939
* {@link Observable} which can produce values. When it does so it will close this (and only this) newly
40-
* created buffer. When the source {@link Observable} completes or produces an error, all chunks are
41-
* emitted, and the event is propagated to all subscribed {@link Observer}s.
40+
* created buffer. When the source {@link Observable} completes, the current buffer is emitted, and the
41+
* event is propagated to all subscribed {@link Observer}s. Note that if the source {@link Observable}
42+
* issues an onError notification the event is passed on immediately without first emitting the buffer
43+
* it is in the process of assembling.
4244
* </p><p>
4345
* Note that when using this operation <strong>multiple overlapping chunks</strong> could be active at any
4446
* one point.

src/main/java/rx/internal/operators/OperatorBufferWithTime.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
* values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer
3535
* is emitted and replaced with a new buffer. How often this is done depends on the specified timespan.
3636
* The creation of chunks is also periodical. How often this is done depends on the specified timeshift.
37-
* When the source {@link Observable} completes or produces an error, the current buffer is emitted, and
38-
* the event is propagated to all subscribed {@link Subscriber}s.
37+
* When the source {@link Observable} completes, the current buffer is emitted, and the event is propagated
38+
* to all subscribed {@link Observer}s. Note that if the source {@link Observable} issues an onError
39+
* notification the event is passed on immediately without first emitting the buffer it is in the process
40+
* of assembling.
3941
* <p>
4042
* Note that this operation can produce <strong>non-connected, or overlapping chunks</strong> depending
4143
* on the input parameters.

0 commit comments

Comments
 (0)