Skip to content

Commit eff6bb0

Browse files
committed
More tests and fix empty() case as well
1 parent 1b70eb0 commit eff6bb0

File tree

2 files changed

+55
-3
lines changed

2 files changed

+55
-3
lines changed

src/main/java/rx/internal/operators/OnSubscribeConcatMap.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ void drain() {
220220

221221
final int delayErrorMode = this.delayErrorMode;
222222

223-
do {
223+
for (;;) {
224224
if (actual.isUnsubscribed()) {
225225
return;
226226
}
@@ -288,11 +288,17 @@ void drain() {
288288
return;
289289
}
290290
}
291+
request(1);
292+
} else {
293+
request(1);
294+
continue;
291295
}
292-
request(1);
293296
}
294297
}
295-
} while (wip.decrementAndGet() != 0);
298+
if (wip.decrementAndGet() == 0) {
299+
break;
300+
}
301+
}
296302
}
297303

298304
void drainError(Throwable mapperError) {

src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,4 +864,50 @@ public void scalarAndRangeBackpressured() {
864864
ts.assertCompleted();
865865
ts.assertNoErrors();
866866
}
867+
868+
@Test
869+
public void scalarAndEmptyBackpressured() {
870+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
871+
872+
Observable.just(1).concatWith(Observable.<Integer>empty()).subscribe(ts);
873+
874+
ts.assertNoValues();
875+
876+
ts.requestMore(5);
877+
878+
ts.assertValue(1);
879+
ts.assertCompleted();
880+
ts.assertNoErrors();
881+
}
882+
883+
@Test
884+
public void rangeAndEmptyBackpressured() {
885+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
886+
887+
Observable.range(1, 2).concatWith(Observable.<Integer>empty()).subscribe(ts);
888+
889+
ts.assertNoValues();
890+
891+
ts.requestMore(5);
892+
893+
ts.assertValues(1, 2);
894+
ts.assertCompleted();
895+
ts.assertNoErrors();
896+
}
897+
898+
@Test
899+
public void emptyAndScalarBackpressured() {
900+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
901+
902+
Observable.<Integer>empty().concatWith(Observable.just(1)).subscribe(ts);
903+
904+
ts.assertNoValues();
905+
906+
ts.requestMore(5);
907+
908+
ts.assertValue(1);
909+
ts.assertCompleted();
910+
ts.assertNoErrors();
911+
}
912+
867913
}

0 commit comments

Comments
 (0)