Skip to content

2.x: Fix Observable.flatMap to sustain concurrency level #6283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Nov 1, 2018

If the Observable.flatMap operator run in limited concurrency mode and multiple sources completed at the same time while the operator was in its drain loop, the operator only started with one new inner source instead of trying to run replace all the completed inner sources with new ones.

The solution is to count the completed sources and replenish them in a loop.

(The Flowable variant works properly because it uses backpressure and inner source counting already to replenish those completed inner sources. The Observable doesn't have backpressure so it has to emulate it via the work-in-progress counting and the secondary queue for available inner sources.)

Fixes: #6282

@akarnokd akarnokd added this to the 2.2 backlog milestone Nov 1, 2018
@codecov
Copy link

codecov bot commented Nov 1, 2018

Codecov Report

Merging #6283 into 2.x will increase coverage by 0.02%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #6283      +/-   ##
============================================
+ Coverage     98.25%   98.28%   +0.02%     
- Complexity     6259     6261       +2     
============================================
  Files           667      667              
  Lines         44887    44888       +1     
  Branches       6213     6214       +1     
============================================
+ Hits          44104    44117      +13     
+ Misses          247      239       -8     
+ Partials        536      532       -4
Impacted Files Coverage Δ Complexity Δ
...ternal/operators/observable/ObservableFlatMap.java 88.99% <100%> (+0.03%) 3 <0> (ø) ⬇️
.../operators/observable/ObservableFlatMapSingle.java 88.8% <0%> (-5.98%) 2% <0%> (ø)
...activex/internal/observers/QueueDrainObserver.java 97.43% <0%> (-2.57%) 21% <0%> (-1%)
...java/io/reactivex/processors/PublishProcessor.java 98.19% <0%> (-1.81%) 42% <0%> (-1%)
...tivex/internal/operators/single/SingleTimeout.java 98.33% <0%> (-1.67%) 2% <0%> (ø)
...perators/single/SingleFlatMapIterableFlowable.java 96.66% <0%> (-1.67%) 2% <0%> (ø)
...ernal/operators/flowable/FlowableFromIterable.java 95.18% <0%> (-1.61%) 5% <0%> (ø)
.../operators/mixed/FlowableConcatMapCompletable.java 99.14% <0%> (-0.86%) 2% <0%> (ø)
...x/internal/operators/observable/ObservableZip.java 99.29% <0%> (-0.71%) 6% <0%> (ø)
...ex/internal/operators/flowable/FlowableCreate.java 97.09% <0%> (-0.65%) 6% <0%> (ø)
... and 15 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c3cfb5a...0138376. Read the comment docs.

@akarnokd akarnokd merged commit 3281b02 into ReactiveX:2.x Nov 5, 2018
@akarnokd akarnokd deleted the ObservableFlatMapMaxConcurrencyFix branch November 5, 2018 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants