Skip to content

Using publish/share/etc leads to upstream operator running on unexpected scheduler #6144

@damianw

Description

@damianw

There appears to be a bug, or at the very least an undocumented and unexpected case, where FlowablePublish causes upstream operators to be applied on the subscription scheduler rather than the upstream observation scheduler (due to polling). This is reproducible at least in 2.1.x and 2.2.0.

Sample (Kotlin)

    val scheduler1 = Schedulers.from(Executors.newSingleThreadExecutor { runnable ->
        Thread(runnable, "scheduler1")
    })

    val scheduler2 = Schedulers.from(Executors.newSingleThreadExecutor { runnable ->
        Thread(runnable, "scheduler2")
    })

    val flowable: Flowable<Long> = Flowable.interval(1L, TimeUnit.MILLISECONDS)
            .onBackpressureLatest()
            .take(10_000)
            .observeOn(scheduler1)
            .map {
                val threadName = Thread.currentThread().name
                if (threadName != "scheduler1") {
                    throw AssertionError("Wrong thread: $threadName")
                }
                it
            }
            .share()

    flowable
            .observeOn(scheduler2)
            .doOnNext { Thread.sleep(10L) }
            .subscribe {
                println("onNext on ${Thread.currentThread().name}: $it")
            }

Expected Behavior

It should be expected that the .map operator is applied on the scheduler1 scheduler since it is the scheduler supplied to observeOn immediately upstream.

Actual Behavior

After about ~100 emissions, the map operator is applied on the scheduler2 scheduler. I'm not an expert in the details, but it seems to have something to do with backpressure, buffering, and polling for items upon subscription?

Output

onNext on scheduler2: 0
onNext on scheduler2: 1
onNext on scheduler2: 2
onNext on scheduler2: 3
onNext on scheduler2: 4
onNext on scheduler2: 5
onNext on scheduler2: 6
onNext on scheduler2: 7
onNext on scheduler2: 8
onNext on scheduler2: 9
onNext on scheduler2: 10
onNext on scheduler2: 11
onNext on scheduler2: 12
onNext on scheduler2: 13
onNext on scheduler2: 14
onNext on scheduler2: 15
onNext on scheduler2: 16
onNext on scheduler2: 17
onNext on scheduler2: 18
onNext on scheduler2: 19
onNext on scheduler2: 20
onNext on scheduler2: 21
onNext on scheduler2: 22
onNext on scheduler2: 23
onNext on scheduler2: 24
onNext on scheduler2: 25
onNext on scheduler2: 26
onNext on scheduler2: 27
onNext on scheduler2: 28
onNext on scheduler2: 29
onNext on scheduler2: 30
onNext on scheduler2: 31
onNext on scheduler2: 32
onNext on scheduler2: 33
onNext on scheduler2: 34
onNext on scheduler2: 35
onNext on scheduler2: 36
onNext on scheduler2: 37
onNext on scheduler2: 38
onNext on scheduler2: 39
onNext on scheduler2: 40
onNext on scheduler2: 41
onNext on scheduler2: 42
onNext on scheduler2: 43
onNext on scheduler2: 44
onNext on scheduler2: 45
onNext on scheduler2: 46
onNext on scheduler2: 47
onNext on scheduler2: 48
onNext on scheduler2: 49
onNext on scheduler2: 50
onNext on scheduler2: 51
onNext on scheduler2: 52
onNext on scheduler2: 53
onNext on scheduler2: 54
onNext on scheduler2: 55
onNext on scheduler2: 56
onNext on scheduler2: 57
onNext on scheduler2: 58
onNext on scheduler2: 59
onNext on scheduler2: 60
onNext on scheduler2: 61
onNext on scheduler2: 62
onNext on scheduler2: 63
onNext on scheduler2: 64
onNext on scheduler2: 65
onNext on scheduler2: 66
onNext on scheduler2: 67
onNext on scheduler2: 68
onNext on scheduler2: 69
onNext on scheduler2: 70
onNext on scheduler2: 71
onNext on scheduler2: 72
onNext on scheduler2: 73
onNext on scheduler2: 74
onNext on scheduler2: 75
onNext on scheduler2: 76
onNext on scheduler2: 77
onNext on scheduler2: 78
onNext on scheduler2: 79
onNext on scheduler2: 80
onNext on scheduler2: 81
onNext on scheduler2: 82
onNext on scheduler2: 83
onNext on scheduler2: 84
onNext on scheduler2: 85
onNext on scheduler2: 86
onNext on scheduler2: 87
onNext on scheduler2: 88
onNext on scheduler2: 89
onNext on scheduler2: 90
onNext on scheduler2: 91
onNext on scheduler2: 92
onNext on scheduler2: 93
onNext on scheduler2: 94
onNext on scheduler2: 95
io.reactivex.exceptions.OnErrorNotImplementedException: Wrong thread: scheduler2
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:261)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:226)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: Wrong thread: scheduler2
	at RxTestKt$main$flowable$1.apply(RxTest.kt:22)
	at RxTestKt$main$flowable$1.apply(RxTest.kt)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.poll(FlowableMap.java:81)
	at io.reactivex.internal.operators.flowable.FlowablePublish$PublishSubscriber.dispatch(FlowablePublish.java:510)
	at io.reactivex.internal.operators.flowable.FlowablePublish$InnerSubscriber.request(FlowablePublish.java:615)
	at io.reactivex.internal.operators.flowable.FlowableRefCount$RefCountSubscriber.request(FlowableRefCount.java:216)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
	... 6 more
Exception in thread "scheduler2" io.reactivex.exceptions.OnErrorNotImplementedException: Wrong thread: scheduler2
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:207)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:392)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:261)
	at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:226)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: Wrong thread: scheduler2
	at RxTestKt$main$flowable$1.apply(RxTest.kt:22)
	at RxTestKt$main$flowable$1.apply(RxTest.kt)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.poll(FlowableMap.java:81)
	at io.reactivex.internal.operators.flowable.FlowablePublish$PublishSubscriber.dispatch(FlowablePublish.java:510)
	at io.reactivex.internal.operators.flowable.FlowablePublish$InnerSubscriber.request(FlowablePublish.java:615)
	at io.reactivex.internal.operators.flowable.FlowableRefCount$RefCountSubscriber.request(FlowableRefCount.java:216)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
	... 6 more

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions