diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 53d14c4cfd..603e68d9e2 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -7201,8 +7201,8 @@ public final Observable rebatchRequests(int n) { * that does a similar operation on lists. *
*
Backpressure Support:
- *
This operator does not support backpressure because by intent it will receive all values and reduce - * them to a single {@code onNext}.
+ *
The operator honors backpressure of its downstream consumer and consumes the + * upstream source in unbounded mode.
*
Scheduler:
*
{@code reduce} does not operate by default on a particular {@link Scheduler}.
*
@@ -7238,10 +7238,24 @@ public final Observable reduce(Func2 accumulator) { * This technique, which is called "reduce" here, is sometimes called "aggregate," "fold," "accumulate," * "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method * that does a similar operation on lists. + *

+ * Note that the {@code initialValue} is shared among all subscribers to the resulting Observable + * and may cause problems if it is mutable. To make sure each subscriber gets its own value, defer + * the application of this operator via {@link #defer(Func0)}: + *


+     * Observable<T> source = ...
+     * Observable.defer(() -> source.reduce(new ArrayList<>(), (list, item) -> list.add(item)));
+     * 
+     * // alternatively, by using compose to stay fluent
+     * 
+     * source.compose(o ->
+     *     Observable.defer(() -> o.reduce(new ArrayList<>(), (list, item) -> list.add(item)))
+     * );
+     * 
*
*
Backpressure Support:
- *
This operator does not support backpressure because by intent it will receive all values and reduce - * them to a single {@code onNext}.
+ *
The operator honors backpressure of its downstream consumer and consumes the + * upstream source in unbounded mode.
*
Scheduler:
*
{@code reduce} does not operate by default on a particular {@link Scheduler}.
*
@@ -8167,7 +8181,23 @@ public final Observable scan(Func2 accumulator) { *

* Note that the Observable that results from this method will emit {@code initialValue} as its first * emitted item. + *

+ * Note that the {@code initialValue} is shared among all subscribers to the resulting Observable + * and may cause problems if it is mutable. To make sure each subscriber gets its own value, defer + * the application of this operator via {@link #defer(Func0)}: + *


+     * Observable<T> source = ...
+     * Observable.defer(() -> source.scan(new ArrayList<>(), (list, item) -> list.add(item)));
+     * 
+     * // alternatively, by using compose to stay fluent
+     * 
+     * source.compose(o ->
+     *     Observable.defer(() -> o.scan(new ArrayList<>(), (list, item) -> list.add(item)))
+     * );
+     * 
*
+ *
Backpressure:
+ *
The operator honors backpressure.
*
Scheduler:
*
{@code scan} does not operate by default on a particular {@link Scheduler}.
*