Skip to content

Avoid merge operator from over requesting upstream producer #2766

Closed
@dpsm

Description

@dpsm

The merge operator requests from upstream disregarding the amount
of requested items from downstream. This causes an issue where
it creates a produce/consume cycle that is infinite and puts
pressure on producer.

The code below illustrates the case where it loops and never emits anything to the downstream subscriber.

final AtomicInteger generated1 = new AtomicInteger();
        final TestScheduler scheduler = new TestScheduler();
        TestSubscriber<Integer> nonScalartestSubscriber = spy(new TestSubscriber<Integer>() {
            @Override
            public void onStart() {
                request(10);
            }
        });
        Observable<Integer> o1 = createInfiniteObservable(generated1)
            .flatMap(new Func1<Integer, Observable<Integer>>() {
                @Override
                public Observable<Integer> call(final Integer integer) {
                    return Observable.create(new OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(-integer);
                            subscriber.onCompleted();
                        }
                    });
                }
            })
            .subscribeOn(scheduler);
        o1.subscribe(nonScalartestSubscriber);

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions