Skip to content

Commit dc5f4c3

Browse files
committed
Merge pull request #3840 from akarnokd/ConcatMapFix
1.x: fix concatMap scalar/empty source behavior
2 parents 31dc74a + eff6bb0 commit dc5f4c3

File tree

2 files changed

+72
-4
lines changed

2 files changed

+72
-4
lines changed

src/main/java/rx/internal/operators/OnSubscribeConcatMap.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ void drain() {
220220

221221
final int delayErrorMode = this.delayErrorMode;
222222

223-
do {
223+
for (;;) {
224224
if (actual.isUnsubscribed()) {
225225
return;
226226
}
@@ -273,6 +273,8 @@ void drain() {
273273
if (source instanceof ScalarSynchronousObservable) {
274274
ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;
275275

276+
active = true;
277+
276278
arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
277279
} else {
278280
ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
@@ -286,11 +288,17 @@ void drain() {
286288
return;
287289
}
288290
}
291+
request(1);
292+
} else {
293+
request(1);
294+
continue;
289295
}
290-
request(1);
291296
}
292297
}
293-
} while (wip.decrementAndGet() != 0);
298+
if (wip.decrementAndGet() == 0) {
299+
break;
300+
}
301+
}
294302
}
295303

296304
void drainError(Throwable mapperError) {
@@ -352,7 +360,7 @@ public ConcatMapInnerScalarProducer(R value, ConcatMapSubscriber<T, R> parent) {
352360

353361
@Override
354362
public void request(long n) {
355-
if (!once) {
363+
if (!once && n > 0L) {
356364
once = true;
357365
ConcatMapSubscriber<T, R> p = parent;
358366
p.innerNext(value);

src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,4 +850,64 @@ public Observable<Integer> call(Integer t) {
850850
}
851851
}
852852

853+
@Test
854+
public void scalarAndRangeBackpressured() {
855+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
856+
857+
Observable.just(1).concatWith(Observable.range(2, 3)).subscribe(ts);
858+
859+
ts.assertNoValues();
860+
861+
ts.requestMore(5);
862+
863+
ts.assertValues(1, 2, 3, 4);
864+
ts.assertCompleted();
865+
ts.assertNoErrors();
866+
}
867+
868+
@Test
869+
public void scalarAndEmptyBackpressured() {
870+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
871+
872+
Observable.just(1).concatWith(Observable.<Integer>empty()).subscribe(ts);
873+
874+
ts.assertNoValues();
875+
876+
ts.requestMore(5);
877+
878+
ts.assertValue(1);
879+
ts.assertCompleted();
880+
ts.assertNoErrors();
881+
}
882+
883+
@Test
884+
public void rangeAndEmptyBackpressured() {
885+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
886+
887+
Observable.range(1, 2).concatWith(Observable.<Integer>empty()).subscribe(ts);
888+
889+
ts.assertNoValues();
890+
891+
ts.requestMore(5);
892+
893+
ts.assertValues(1, 2);
894+
ts.assertCompleted();
895+
ts.assertNoErrors();
896+
}
897+
898+
@Test
899+
public void emptyAndScalarBackpressured() {
900+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
901+
902+
Observable.<Integer>empty().concatWith(Observable.just(1)).subscribe(ts);
903+
904+
ts.assertNoValues();
905+
906+
ts.requestMore(5);
907+
908+
ts.assertValue(1);
909+
ts.assertCompleted();
910+
ts.assertNoErrors();
911+
}
912+
853913
}

0 commit comments

Comments
 (0)