-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Milestone
Description
RxJava 2.0.8
Setting the terminal event of the BehaviorProcessor
and replaying it to late subscribers is not an atomic action, and causes a NullPointerException
in certain racy situations.
The root cause seems to be the non-atomicity of marking the processor terminated, and setting the terminal event, inside the BehaviorProcessor#terminate
method, here.
Following is a minimal example which helps reproducing the race condition.
package com.example;
import io.reactivex.processors.BehaviorProcessor;
import io.reactivex.subscribers.TestSubscriber;
public class BehaviorProcessorRaceExample {
public static void main(String[] args) {
for (int i = 0; i <= 100000; i++) {
final TestSubscriber<Object> s = TestSubscriber.create();
final BehaviorProcessor<Boolean> bp = BehaviorProcessor.create();
final Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
bp.onComplete();
}
});
final Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
bp.subscribe(s);
}
});
t1.setDaemon(true);
t2.setDaemon(true);
t1.start();
t2.start();
}
}
}
Running this will result in exceptions like this.
Exception in thread "Thread-1279" java.lang.NullPointerException
at io.reactivex.internal.util.NotificationLite.getError(NotificationLite.java:189)
at io.reactivex.processors.BehaviorProcessor.subscribeActual(BehaviorProcessor.java:162)
at io.reactivex.Flowable.subscribe(Flowable.java:12970)
at com.example.BehaviorProcessorRaceExample$2.run(BehaviorProcessorRaceExample.java:24)
at java.lang.Thread.run(Thread.java:745)
artem-zinnatullin and steipete