-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
Operators fusion in RxJava 2 is a really nice feature but I have found a case when an implementation of Flowable.rangeLong (and I assume there is similar behavior in neighbor standard flowables) fused together with operator observeOn() breaks the semantics of the operator observeOn().
Basically, no emissions from FlowableRangeLong flowable is done on the scheduler provided in observeOn() but rather on the thread which calls request() in the chain.
Here is the code snipped which reproduces the issue.
import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscription;
public class RxJavaFuseTest {
public static void main(String[] args) {
FlowableSubscriber sequentialSubscriber = new FlowableSubscriber() {
private Subscription s;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
this.s.request(1);
}
@Override
public void onNext(Object x) {
s.request(1);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
System.exit(0);
}
@Override
public void onComplete() {
System.out.println("Completed");
System.exit(0);
}
};
Flowable.rangeLong(0, 10)
.observeOn(RxJavaPlugins.createSingleScheduler(r -> new Thread(r, "producer")), false, 1)
.doOnNext(aLong -> {
System.out.println(aLong + " emitting on " + Thread.currentThread().getName());
})
.parallel(2, 1)
.runOn(Schedulers.computation(), 1)
.doOnNext(aLong -> System.out.println(aLong + " processing on " + Thread.currentThread().getName()))
.sequential()
.subscribe(sequentialSubscriber);
}
}
Output is as follows:
0 emitting on main
0 processing on RxComputationThreadPool-1
1 emitting on main
2 emitting on main
1 processing on RxComputationThreadPool-2
2 processing on RxComputationThreadPool-1
3 emitting on main
4 emitting on main
3 processing on RxComputationThreadPool-2
5 emitting on RxComputationThreadPool-2
5 processing on RxComputationThreadPool-2
4 processing on RxComputationThreadPool-1
6 emitting on RxComputationThreadPool-2
7 emitting on RxComputationThreadPool-2
6 processing on RxComputationThreadPool-2
7 processing on RxComputationThreadPool-1
8 emitting on RxComputationThreadPool-2
9 emitting on RxComputationThreadPool-2
8 processing on RxComputationThreadPool-2
9 processing on RxComputationThreadPool-1
Completed
Maybe I don't get something and have not read documentation carefully enough, but to me it looks like the fusion between operators breaks semantics of ObserveOn.
Moreover, while digging in the source code of related classes in RxJava, I have found suspicious code in the FlowableRangeLong.BaseRangeSubscription.poll() method: it's potentially can be called from different threads but field "index" is not guarded anyhow from contended access so can produce wrong values if instructions reordering or caching is in place on a processor.