Skip to content

Commit 2b17aba

Browse files
BugFix: Reduce an empty observable
This fixes issue ReactiveX#423 The fix is based on this comment by @headinthebox: ReactiveX#423 (comment)
1 parent 682590c commit 2b17aba

File tree

2 files changed

+34
-1
lines changed

2 files changed

+34
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3520,11 +3520,18 @@ public Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction)
35203520
* Observable, whose result will be used in the next accumulator call
35213521
* @return an Observable that emits a single item that is the result of accumulating the
35223522
* output from the source Observable
3523+
* @throws IllegalArgumentException
3524+
* if Observable sequence is empty.
35233525
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
35243526
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
35253527
*/
35263528
public Observable<T> reduce(Func2<T, T, T> accumulator) {
3527-
return create(OperationScan.scan(this, accumulator)).takeLast(1);
3529+
/*
3530+
* Discussion and confirmation of implementation at https://github.com/Netflix/RxJava/issues/423#issuecomment-27642532
3531+
*
3532+
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
3533+
*/
3534+
return create(OperationScan.scan(this, accumulator)).last();
35283535
}
35293536

35303537
/**

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import rx.observables.ConnectableObservable;
3737
import rx.subscriptions.BooleanSubscription;
3838
import rx.subscriptions.Subscriptions;
39+
import rx.util.functions.Action0;
3940
import rx.util.functions.Action1;
4041
import rx.util.functions.Func1;
4142
import rx.util.functions.Func2;
@@ -210,6 +211,31 @@ public Integer call(Integer t1, Integer t2) {
210211
verify(w).onNext(10);
211212
}
212213

214+
215+
/**
216+
* A reduce should fail with an IllegalArgumentException if done on an empty Observable.
217+
*/
218+
@Test(expected = IllegalArgumentException.class)
219+
public void testReduceWithEmptyObservable() {
220+
Observable<Integer> observable = Observable.range(1, 0);
221+
observable.reduce(new Func2<Integer, Integer, Integer>() {
222+
223+
@Override
224+
public Integer call(Integer t1, Integer t2) {
225+
return t1 + t2;
226+
}
227+
228+
}).toBlockingObservable().forEach(new Action1<Integer>() {
229+
230+
@Override
231+
public void call(Integer t1) {
232+
// do nothing ... we expect an exception instead
233+
}
234+
});
235+
236+
fail("Expected an exception to be thrown");
237+
}
238+
213239
@Test
214240
public void testReduceWithInitialValue() {
215241
Observable<Integer> observable = Observable.from(1, 2, 3, 4);

0 commit comments

Comments
 (0)