Skip to content

Outer groupBy doesn't obey backpressure #3425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
abersnaze opened this issue Oct 9, 2015 · 2 comments
Closed

Outer groupBy doesn't obey backpressure #3425

abersnaze opened this issue Oct 9, 2015 · 2 comments
Labels
Milestone

Comments

@abersnaze
Copy link
Contributor

Someone here brought by some code that was throwing a MissingBackpressureException. Aaron and I narrowed it down to this snippet.

merge(range(0, 500).groupBy(i -> i % (RxRingBuffer.SIZE + 2)).observeOn(computation())).toBlocking().last()

it comes down observeOn requesting 128 Observables from groupBy and groupBy hard coded to request 1024 from range.

We fixed his problem changing the observeOn(computation()) to flatMap(grp -> grp.subscribeOn(computation()))2

@abersnaze abersnaze added the Bug label Oct 9, 2015
@abersnaze abersnaze added this to the 1.0.x milestone Oct 9, 2015
@akarnokd
Copy link
Member

akarnokd commented Oct 9, 2015

It fails with 2.x too. I'll have a look at this within 2.x.

@akarnokd
Copy link
Member

akarnokd commented Oct 9, 2015

I've looked into this with 2.x. The problem is that a request(N) may result in a new group beyond the requested amount by the main subscriber (i.e., the one that receives GroupedObservables). Since consuming groups requires value replenishment from main, one can't be sure what the next value will be or where it will go (into the same group, into another group or into a completely new group).

The solution is to buffer the GroupedObservables for the main subscriber and hand them out based on requests while letting the groups be consumed. Since we can't know the number of groups, this buffering has to be unbounded, similar to how each group has to be unbounded (due to the same reason as before: asking for replenishment for a group may result in a value for another group).

I don't fully understand 1.x GroupBy so I don't know how to fix it with minimal changes. The best I can do is to port back the 2.x GroupBy, however, that requires SpscLinkedArrayQueue from #3169.

akarnokd added a commit to akarnokd/RxJava that referenced this issue Oct 9, 2015
This is a backport of the 2.x GroupBy operator which solves ReactiveX#3425.

One unit test in OperatorRetryTest had to be altered a bit. I believe
the original code relied on a GroupBy behavior which caused the bug in
ReactiveX#3425.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants