Closed
Description
Take a look at this (admittedly somewhat wonky) sample stream:
Observable.range(0, 2)
.flatMap(__ -> Observable.just(null))
.subscribe(new Subscriber<Object>() {
@Override public void onNext(Object o) {
System.out.println("onNext(" + o + ")");
}
@Override public void onError(Throwable e) {
e.printStackTrace();
}
@Override public void onStart() {
request(1);
}
@Override public void onCompleted() { }
});
In this sample I'm purposefully only requesting one item so that the second one ends up getting queued in the internal OperatorMerge
. As a result of passing null
to the queue, though, I end up getting this in onError
:
java.lang.NullPointerException
at rx.internal.util.atomic.SpscUnboundedAtomicArrayQueue.offer(SpscUnboundedAtomicArrayQueue.java:71)
at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:465)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:437)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:228)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.internal.operators.OnSubscribeRange$RangeProducer.fastpath(OnSubscribeRange.java:126)
at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:38)
at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:26)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable$2.call(Observable.java:162)
at rx.Observable$2.call(Observable.java:154)
at rx.Observable.subscribe(Observable.java:8191)
at rx.Observable.subscribe(Observable.java:8158)
at net.danlew.experiments.Tester.main(Tester.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: 1
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109)
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:187)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
... 18 more
I'm running into in more complex code where the requests # is different than the # items pushed and seeing the same problem.