diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7e0e89a77a..340926276a 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -7660,6 +7660,15 @@ public final Observable scan(Func2 accumulator) { *

* Note that the Observable that results from this method will emit {@code initialValue} as its first * emitted item. + *

+ * Note that the initial value is shared among all subscribers and is encouraged to be immutable. + * For example, {@code scan(0, (a, b) -< a + b)} shares the value 0 but there is no interference + * because the underlying Integer object is immutable. However, + * {@code scan(new ArrayList<>(), (a, b) -< a.add(b))} does interfere because the same initial + * ArrayList may be manipulated by multiple subscriptions to scan. To avoid unintentional + * sharing of the initial value, consider using the {@link #scan(Func2, Func0)} overload + * which allows specifying a per-Subsriber invoked initial value factory function. + * *

*
Scheduler:
*
{@code scan} does not operate by default on a particular {@link Scheduler}.
@@ -7674,11 +7683,50 @@ public final Observable scan(Func2 accumulator) { * @return an Observable that emits {@code initialValue} followed by the results of each call to the * accumulator function * @see ReactiveX operators documentation: Scan + * @see #scan(Func2, Func0) */ public final Observable scan(R initialValue, Func2 accumulator) { return lift(new OperatorScan(initialValue, accumulator)); } + + /** + * Applies an accumulator function to the source values and emits the current accumulated value, + * starting with an initial value generated by a factory function. + * + *
+ *
Backpressure:
+ *
{@code scan} supports backpressure
+ *
Scheduler:
+ *
{@code scan} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the accumulated value type + * @param accumulator the function called with the current accumulator value, the current upstream + * value and expected to return a (new) accumulator value. + * @param initialFactory the function called for each Subscriber to generate an initial accumulator value + * @return the Observable instance that accumulates upstream values and emits each accumulated value, + * starting with a per-subscriber generated value. + */ + public final Observable scan(final Func2 accumulator, final Func0 initialFactory) { + return defer(new ScanWithInitialFactory(initialFactory, accumulator)); + } + + /** The scan function that calls a factory for the initial value for each subscriber. */ + final class ScanWithInitialFactory implements Func0> { + private final Func0 initialFactory; + private final Func2 accumulator; + + private ScanWithInitialFactory(Func0 initialFactory, Func2 accumulator) { + this.initialFactory = initialFactory; + this.accumulator = accumulator; + } + + @Override + public Observable call() { + return scan(initialFactory.call(), accumulator); + } + } + /** * Forces an Observable's emissions and notifications to be serialized and for it to obey * the Observable contract in other ways. diff --git a/src/test/java/rx/internal/operators/OperatorScanTest.java b/src/test/java/rx/internal/operators/OperatorScanTest.java index e45f32f92c..e956e9025e 100644 --- a/src/test/java/rx/internal/operators/OperatorScanTest.java +++ b/src/test/java/rx/internal/operators/OperatorScanTest.java @@ -469,4 +469,54 @@ public Integer call(Integer t1, Integer t2) { assertEquals(Arrays.asList(Long.MAX_VALUE), requests); } + + @Test + public void withFactory() { + Func0> factory = new Func0>() { + @Override + public List call() { + return new ArrayList(); + } + }; + + Observable> source = Observable.range(1, 10).scan(new Func2, Integer, List>() { + @Override + public List call(List t1, Integer t2) { + t1.add(t2); + return t1; + } + }, factory); + + int n = 10; + + for (int i = 0; i < n; i++) { + + List list = source.toBlocking().last(); + + Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), list); + } + } + + @Test + public void withoutFactory() { + Observable> source = Observable.range(1, 3).scan(new ArrayList(), new Func2, Integer, List>() { + @Override + public List call(List t1, Integer t2) { + t1.add(t2); + return t1; + } + }); + + List list = source.toBlocking().last(); + + Assert.assertEquals(Arrays.asList(1, 2, 3), list); + + list = source.toBlocking().last(); + + Assert.assertEquals(Arrays.asList(1, 2, 3, 1, 2, 3), list); + + list = source.toBlocking().last(); + + Assert.assertEquals(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3), list); + } }