diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index a9c7b86b09..f3647b94e0 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -237,6 +237,9 @@ public void onNext(Observable t) { if (t == null) { return; } + if (t == Observable.empty()) { + emitEmpty(); + } else if (t instanceof ScalarSynchronousObservable) { tryEmit(((ScalarSynchronousObservable)t).get()); } else { @@ -247,6 +250,16 @@ public void onNext(Observable t) { } } + void emitEmpty() { + int produced = scalarEmissionCount + 1; + if (produced == scalarEmissionLimit) { + scalarEmissionCount = 0; + this.requestMore(produced); + } else { + scalarEmissionCount = produced; + } + } + private void reportError() { List list = new ArrayList(errors); if (list.size() == 1) { diff --git a/src/test/java/rx/internal/operators/OperatorFlatMapTest.java b/src/test/java/rx/internal/operators/OperatorFlatMapTest.java index bb5127665c..cbcf8daefb 100644 --- a/src/test/java/rx/internal/operators/OperatorFlatMapTest.java +++ b/src/test/java/rx/internal/operators/OperatorFlatMapTest.java @@ -29,6 +29,7 @@ import rx.Observer; import rx.exceptions.TestException; import rx.functions.*; +import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -544,4 +545,109 @@ public Observable call(Integer t) { ts.assertValueCount(n * 2); } } + + @Test + public void justEmptyMixture() { + TestSubscriber ts = TestSubscriber.create(); + + Observable.range(0, 4 * RxRingBuffer.SIZE) + .flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return (v & 1) == 0 ? Observable.empty() : Observable.just(v); + } + }) + .subscribe(ts); + + ts.assertValueCount(2 * RxRingBuffer.SIZE); + ts.assertNoErrors(); + ts.assertCompleted(); + + int j = 1; + for (Integer v : ts.getOnNextEvents()) { + Assert.assertEquals(j, v.intValue()); + + j += 2; + } + } + + @Test + public void rangeEmptyMixture() { + TestSubscriber ts = TestSubscriber.create(); + + Observable.range(0, 4 * RxRingBuffer.SIZE) + .flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return (v & 1) == 0 ? Observable.empty() : Observable.range(v, 2); + } + }) + .subscribe(ts); + + ts.assertValueCount(4 * RxRingBuffer.SIZE); + ts.assertNoErrors(); + ts.assertCompleted(); + + int j = 1; + List list = ts.getOnNextEvents(); + for (int i = 0; i < list.size(); i += 2) { + Assert.assertEquals(j, list.get(i).intValue()); + Assert.assertEquals(j + 1, list.get(i + 1).intValue()); + + j += 2; + } + } + + @Test + public void justEmptyMixtureMaxConcurrent() { + TestSubscriber ts = TestSubscriber.create(); + + Observable.range(0, 4 * RxRingBuffer.SIZE) + .flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return (v & 1) == 0 ? Observable.empty() : Observable.just(v); + } + }, 16) + .subscribe(ts); + + ts.assertValueCount(2 * RxRingBuffer.SIZE); + ts.assertNoErrors(); + ts.assertCompleted(); + + int j = 1; + for (Integer v : ts.getOnNextEvents()) { + Assert.assertEquals(j, v.intValue()); + + j += 2; + } + } + + @Test + public void rangeEmptyMixtureMaxConcurrent() { + TestSubscriber ts = TestSubscriber.create(); + + Observable.range(0, 4 * RxRingBuffer.SIZE) + .flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return (v & 1) == 0 ? Observable.empty() : Observable.range(v, 2); + } + }, 16) + .subscribe(ts); + + ts.assertValueCount(4 * RxRingBuffer.SIZE); + ts.assertNoErrors(); + ts.assertCompleted(); + + int j = 1; + List list = ts.getOnNextEvents(); + for (int i = 0; i < list.size(); i += 2) { + Assert.assertEquals(j, list.get(i).intValue()); + Assert.assertEquals(j + 1, list.get(i + 1).intValue()); + + j += 2; + } + } + }