Skip to content

Commit 1b70eb0

Browse files
committed
1.x: fix concatMap scalar source behavior
1 parent bcd7fa1 commit 1b70eb0

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

src/main/java/rx/internal/operators/OnSubscribeConcatMap.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ void drain() {
273273
if (source instanceof ScalarSynchronousObservable) {
274274
ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;
275275

276+
active = true;
277+
276278
arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
277279
} else {
278280
ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
@@ -352,7 +354,7 @@ public ConcatMapInnerScalarProducer(R value, ConcatMapSubscriber<T, R> parent) {
352354

353355
@Override
354356
public void request(long n) {
355-
if (!once) {
357+
if (!once && n > 0L) {
356358
once = true;
357359
ConcatMapSubscriber<T, R> p = parent;
358360
p.innerNext(value);

src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,4 +850,18 @@ public Observable<Integer> call(Integer t) {
850850
}
851851
}
852852

853+
@Test
854+
public void scalarAndRangeBackpressured() {
855+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
856+
857+
Observable.just(1).concatWith(Observable.range(2, 3)).subscribe(ts);
858+
859+
ts.assertNoValues();
860+
861+
ts.requestMore(5);
862+
863+
ts.assertValues(1, 2, 3, 4);
864+
ts.assertCompleted();
865+
ts.assertNoErrors();
866+
}
853867
}

0 commit comments

Comments
 (0)