Skip to content

Commit 08f5860

Browse files
committed
Merge pull request #3809 from akarnokd/MergeNullScalarFix1x
1.x: fix merge/flatMap crashing on an inner scalar of null
2 parents 03333b1 + 47eb306 commit 08f5860

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ protected void queueScalar(T value) {
475475
}
476476
this.queue = q;
477477
}
478-
if (!q.offer(value)) {
478+
if (!q.offer(nl.next(value))) {
479479
unsubscribe();
480480
onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value));
481481
return;

src/test/java/rx/internal/operators/OperatorMergeTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import org.mockito.*;
2929

3030
import rx.*;
31-
import rx.Observable.OnSubscribe;
32-
import rx.Scheduler.Worker;
3331
import rx.Observable;
32+
import rx.Observable.OnSubscribe;
3433
import rx.Observer;
34+
import rx.Scheduler.Worker;
3535
import rx.functions.*;
3636
import rx.internal.util.RxRingBuffer;
3737
import rx.observers.TestSubscriber;
@@ -1353,4 +1353,21 @@ public void zeroMaxConcurrent() {
13531353
assertEquals("maxConcurrent > 0 required but it was 0", e.getMessage());
13541354
}
13551355
}
1356+
1357+
@Test
1358+
public void mergeJustNull() {
1359+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
1360+
1361+
Observable.range(1, 2).flatMap(new Func1<Integer, Observable<Integer>>() {
1362+
@Override
1363+
public Observable<Integer> call(Integer t) {
1364+
return Observable.just(null);
1365+
}
1366+
}).subscribe(ts);
1367+
1368+
ts.requestMore(2);
1369+
ts.assertValues(null, null);
1370+
ts.assertNoErrors();
1371+
ts.assertCompleted();
1372+
}
13561373
}

0 commit comments

Comments
 (0)