Skip to content

onBackpressureBlock() causes hangs #2799

Closed
@wendigo

Description

@wendigo

Given this piece of code:

        Observable.create(subscriber -> {
            long counter = 0;

            while (counter++ < Long.MAX_VALUE) {
                System.out.println("onNext(" + counter + ")");

                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(counter);
                } else {
                    break;
                }

                System.out.println("Subscribe.onNext(" + value + ")");
            }
        })
        .onBackpressureBlock(2)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.computation())
        .subscribe(System.out::println);

On version 1.0.7 above code will hang on producing element on position RxRingBuffer.SIZE + 2 (maxQueueLength):

...
afterOnNext(127)
onNext(128)
afterOnNext(128)
Subscribe.onNext(128)
onNext(129)
afterOnNext(129)
onNext(130)
afterOnNext(130)
onNext(131)

If we switch order of subscribe/observeOn (observeOn before subscribeOn) - it will work as expected :)

Also it will throw MissingBackpressureException without onBackpressure* (as expected).

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions