Skip to content

Javadoc: explain Flowable.groupBy should be consumed on all of its outputs #6289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
akarnokd opened this issue Nov 5, 2018 · 1 comment
Closed

Comments

@akarnokd
Copy link
Member

akarnokd commented Nov 5, 2018

In order for Flowable.groupBy to remain bounded in memory, it uses a prefetch and consumption-based replenishment strategy: when a group has consumed an item, a new item is requested from the source of groupBy.

Since a new item may create a fresh group, the only way to move forward is to have that group consumed as well. For this, the GroupedFlowable itself has to reach some operator and get subscribed to.

The typical use is with flatMap that will subscribe to the groups and keeps consuming them, however, flatMap by default works in a limited concurrency mode. It means as long as the groups themselves don't complete, flatMap won't accept a newer group, and thus that group will not get consumed, resulting in a potential livelock. The solution is to have a larger max-concurrency as there are number of groups expected, potentially going unbounded with flatMap completely.

This is not an issue with Observable.groupBy because there is no backpressure and non-consumed groups can't hinder active groups.

I suggest extending the JavaDocs of all Flowable.groupBy about this property, something along the lines of:

<p>
Note that the {@link GroupedFlowable}s should be subscribed to as soon as possible, otherwise, 
the unconsumed groups may starve other groups due to the internal backpressure
coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using 
{@link #flatMap} or {@link #concatMapEager} and overriding the default maximum concurrency 
value to be greater or equal to the expected number of groups, possibly using 
{@link Integer.MAX_VALUE} if the number of expected groups is unknown.

(Please when writing the {@link #}s out, pick the shortest overload of the respective method that also includes the maxConcurrency parameter. Your IDE's content assist should help you in this regard.)

@akarnokd
Copy link
Member Author

Ah, this has been done in #6308. Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant