Skip to content

1.x: scan with an initial factory callback #3959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7660,6 +7660,15 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
* <p>
* Note that the Observable that results from this method will emit {@code initialValue} as its first
* emitted item.
* <p>
* Note that the initial value is shared among all subscribers and is encouraged to be immutable.
* For example, {@code scan(0, (a, b) -&lt; a + b)} shares the value 0 but there is no interference
* because the underlying Integer object is immutable. However,
* {@code scan(new ArrayList<>(), (a, b) -&lt; a.add(b))} does interfere because the same initial
* ArrayList may be manipulated by multiple subscriptions to scan. To avoid unintentional
* sharing of the initial value, consider using the {@link #scan(Func2, Func0)} overload
* which allows specifying a per-Subsriber invoked initial value factory function.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -7674,11 +7683,50 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
* @return an Observable that emits {@code initialValue} followed by the results of each call to the
* accumulator function
* @see <a href="http://reactivex.io/documentation/operators/scan.html">ReactiveX operators documentation: Scan</a>
* @see #scan(Func2, Func0)
*/
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

/**
* Applies an accumulator function to the source values and emits the current accumulated value,
* starting with an initial value generated by a factory function.
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code scan} supports backpressure</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the accumulated value type
* @param accumulator the function called with the current accumulator value, the current upstream
* value and expected to return a (new) accumulator value.
* @param initialFactory the function called for each Subscriber to generate an initial accumulator value
* @return the Observable instance that accumulates upstream values and emits each accumulated value,
* starting with a per-subscriber generated value.
*/
public final <R> Observable<R> scan(final Func2<R, ? super T, R> accumulator, final Func0<R> initialFactory) {
return defer(new ScanWithInitialFactory<R>(initialFactory, accumulator));
}

/** The scan function that calls a factory for the initial value for each subscriber. */
final class ScanWithInitialFactory<R> implements Func0<Observable<R>> {
private final Func0<R> initialFactory;
private final Func2<R, ? super T, R> accumulator;

private ScanWithInitialFactory(Func0<R> initialFactory, Func2<R, ? super T, R> accumulator) {
this.initialFactory = initialFactory;
this.accumulator = accumulator;
}

@Override
public Observable<R> call() {
return scan(initialFactory.call(), accumulator);
}
}


/**
* Forces an Observable's emissions and notifications to be serialized and for it to obey
* <a href="http://reactivex.io/documentation/contract.html">the Observable contract</a> in other ways.
Expand Down
50 changes: 50 additions & 0 deletions src/test/java/rx/internal/operators/OperatorScanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,54 @@ public Integer call(Integer t1, Integer t2) {

assertEquals(Arrays.asList(Long.MAX_VALUE), requests);
}

@Test
public void withFactory() {
Func0<List<Integer>> factory = new Func0<List<Integer>>() {
@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}
};

Observable<List<Integer>> source = Observable.range(1, 10).scan(new Func2<List<Integer>, Integer, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> t1, Integer t2) {
t1.add(t2);
return t1;
}
}, factory);

int n = 10;

for (int i = 0; i < n; i++) {

List<Integer> list = source.toBlocking().last();

Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), list);
}
}

@Test
public void withoutFactory() {
Observable<List<Integer>> source = Observable.range(1, 3).scan(new ArrayList<Integer>(), new Func2<List<Integer>, Integer, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> t1, Integer t2) {
t1.add(t2);
return t1;
}
});

List<Integer> list = source.toBlocking().last();

Assert.assertEquals(Arrays.asList(1, 2, 3), list);

list = source.toBlocking().last();

Assert.assertEquals(Arrays.asList(1, 2, 3, 1, 2, 3), list);

list = source.toBlocking().last();

Assert.assertEquals(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3), list);
}
}