diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index 551ceba280..a4766f389d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -376,7 +376,7 @@ void drainLoop() { return; } - boolean innerCompleted = false; + int innerCompleted = 0; if (n != 0) { long startId = lastId; int index = lastIndex; @@ -423,7 +423,7 @@ void drainLoop() { return; } removeInner(is); - innerCompleted = true; + innerCompleted++; j++; if (j == n) { j = 0; @@ -449,7 +449,7 @@ void drainLoop() { if (checkTerminate()) { return; } - innerCompleted = true; + innerCompleted++; } j++; @@ -461,17 +461,19 @@ void drainLoop() { lastId = inner[j].id; } - if (innerCompleted) { + if (innerCompleted != 0) { if (maxConcurrency != Integer.MAX_VALUE) { - ObservableSource p; - synchronized (this) { - p = sources.poll(); - if (p == null) { - wip--; - continue; + while (innerCompleted-- != 0) { + ObservableSource p; + synchronized (this) { + p = sources.poll(); + if (p == null) { + wip--; + continue; + } } + subscribeInner(p); } - subscribeInner(p); } continue; } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index bb525b2ddb..4c1441775b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -1084,4 +1084,44 @@ public void remove() { assertEquals(1, counter.get()); } + + @Test + public void maxConcurrencySustained() { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + PublishProcessor pp3 = PublishProcessor.create(); + PublishProcessor pp4 = PublishProcessor.create(); + + TestSubscriber ts = Flowable.just(pp1, pp2, pp3, pp4) + .flatMap(new Function, Flowable>() { + @Override + public Flowable apply(PublishProcessor v) throws Exception { + return v; + } + }, 2) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (v == 1) { + // this will make sure the drain loop detects two completed + // inner sources and replaces them with fresh ones + pp1.onComplete(); + pp2.onComplete(); + } + } + }) + .test(); + + pp1.onNext(1); + + assertFalse(pp1.hasSubscribers()); + assertFalse(pp2.hasSubscribers()); + assertTrue(pp3.hasSubscribers()); + assertTrue(pp4.hasSubscribers()); + + ts.dispose(); + + assertFalse(pp3.hasSubscribers()); + assertFalse(pp4.hasSubscribers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index efee97f33a..960722060a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -1045,4 +1045,44 @@ public Integer apply(Integer v) to.assertValuesOnly(10, 11, 12, 13, 14, 20, 21, 22, 23, 24); } + + @Test + public void maxConcurrencySustained() { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + PublishSubject ps3 = PublishSubject.create(); + PublishSubject ps4 = PublishSubject.create(); + + TestObserver to = Observable.just(ps1, ps2, ps3, ps4) + .flatMap(new Function, ObservableSource>() { + @Override + public ObservableSource apply(PublishSubject v) throws Exception { + return v; + } + }, 2) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + if (v == 1) { + // this will make sure the drain loop detects two completed + // inner sources and replaces them with fresh ones + ps1.onComplete(); + ps2.onComplete(); + } + } + }) + .test(); + + ps1.onNext(1); + + assertFalse(ps1.hasObservers()); + assertFalse(ps2.hasObservers()); + assertTrue(ps3.hasObservers()); + assertTrue(ps4.hasObservers()); + + to.dispose(); + + assertFalse(ps3.hasObservers()); + assertFalse(ps4.hasObservers()); + } }