Skip to content

flatMap concurrency decrease in case of simultaneous finishing in the child observables #6282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
yklymenko opened this issue Nov 1, 2018 · 1 comment

Comments

@yklymenko
Copy link

yklymenko commented Nov 1, 2018

RxJava 2.2.3

Following code reproduces the problem:

public class TestObs {

    static public void main(String[] args) {

        AtomicInteger count = new AtomicInteger();

        Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());
        Observable.range(0, 400)
                .flatMap(i -> Observable.just(i).subscribeOn(scheduler).map(x -> {
                     System.out.println("Start " + Thread.currentThread().getName()
                        + " " + x + " concurrent " + count.incrementAndGet());
                     Thread.sleep(1000);
                     System.out.println("End " + Thread.currentThread().getName()
                        + " " + x + " concurrent " + count.decrementAndGet());

                     return x;
                  }),
                  30
                ).ignoreElements().blockingGet();
    }
}

At the end you will see that only 1 child will be executed concurrently. At the beginning it is 30.

It is about flatMap operator with concurrency argument. In the case if you use concurrency Integer.MAX_VALUE with fixed pool is everything ok.
The problem happens in the case if more than one inner process are done in the same drainLoop.
I think it is here (io.reactivex.internal.operators.observable.ObservableFlatMap 448)

                        SimpleQueue<U> innerQueue = is.queue;
                        if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            removeInner(is);
                            if (checkTerminate()) {
                                return;
                            }
                            innerCompleted = true;
                        }

More then one will be removed, but innerCompleted pulls only one as replacement (io.reactivex.internal.operators.observable.ObservableFlatMap 468)

@akarnokd akarnokd added this to the 2.2 backlog milestone Nov 1, 2018
@akarnokd
Copy link
Member

akarnokd commented Nov 1, 2018

Thanks for reporting. This is indeed a bug in the operator which should try to sustain the concurrency level as best as it can. I'll post a fix shortly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants