Skip to content

Merge can now operate in horizontally unbounded mode. #3169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 10, 2015

Conversation

akarnokd
Copy link
Member

Resolves #3156

Note that since the default merge operation is unbounded, this change could lead to an excessive memory usage when flatMapping fast sources. Note that the pre 1.0.13 version did this albeit on a slighty slower path.

The change also affects the scalar optimization as well. Pre 1.0.13 implicitly limited the concurrency level to RxRingBuffer.SIZE when scalars were received. This version now fills the queue up to the concurrency level.

For 2.0, I suggest having a bounded behavior by default and require the developer to specify Integer.MAX_VALUE to go for the unbounded behavior so he/she knows about the consequences.

@benjchristensen benjchristensen modified the milestone: 1.0.x Aug 28, 2015
@akarnokd akarnokd closed this Sep 9, 2015
@akarnokd akarnokd deleted the MergeHorizontalUnbounded branch September 9, 2015 15:32
@akarnokd akarnokd restored the MergeHorizontalUnbounded branch September 9, 2015 15:32
@akarnokd akarnokd reopened this Sep 9, 2015
@benjchristensen
Copy link
Member

I'll have to allocate some time to review this later as it is somewhat detailed. Thank you for tackling this.

@benjchristensen
Copy link
Member

Pre 1.0.13 implicitly limited the concurrency level to RxRingBuffer.SIZE when scalars were received.

That was done by design, since scalars behave as onNext. And we can limit the number of Observables being merged if we have scalars queued up for delivery.

The reason we can't limit the number of async Observables is because we don't know if they will emit or not and thus risk deadlock.

@abersnaze abersnaze mentioned this pull request Oct 8, 2015
6 tasks
akarnokd added a commit to akarnokd/RxJava that referenced this pull request Nov 9, 2015
I forgot to add the choice because 2.x SpscArrayQueue doesn't use
Unsafe. I copied the SpscAtomicArrayQueue from ReactiveX#3169 and I hope it won't
conflict.
@stealthcode
Copy link

👍

@akarnokd
Copy link
Member Author

I'll rebase this.

@akarnokd akarnokd force-pushed the MergeHorizontalUnbounded branch from fb80b82 to e8beca7 Compare November 10, 2015 21:30
@akarnokd
Copy link
Member Author

Rebased, let's get this finally in.

akarnokd added a commit that referenced this pull request Nov 10, 2015
Merge can now operate in horizontally unbounded mode.
@akarnokd akarnokd merged commit 9bc1987 into ReactiveX:1.x Nov 10, 2015
@akarnokd akarnokd deleted the MergeHorizontalUnbounded branch November 10, 2015 21:39
@stealthcode
Copy link

Thanks

tejasgarde pushed a commit to tejasgarde/RxJava that referenced this pull request Apr 4, 2016
I forgot to add the choice because 2.x SpscArrayQueue doesn't use
Unsafe. I copied the SpscAtomicArrayQueue from ReactiveX#3169 and I hope it won't
conflict.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants