-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Unexpected difference between flatmap and concatmap when the stream has only one element #4206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
By definition, If you drive it in an async flow, signals may skip ahead yielding the inconsistency. Therefore, If you need strict ordering, use |
are you talking about onNext from one source being emitted out of order with itself? something like this:
|
Yes. |
That is surprising to me. @benjchristensen I thought that ordering of onNext from one Observable was to be maintained. I don't see in the general rx contract anywhere but I know we've verbally talked about it over the years. |
I'm still having a hard time understanding what happens under the hood. @akarnokd I'm not saying you're wrong, I just want to understand. First of all, how can this be that a source can be shuffled ? Map is probably not responsible for that, so it must come from merge. As I understand MergeOperator, there are 2 ways to emit a scalar that came from a source : the fast path, from tryEmit, or the emit loop whose role is to unstack failed fast path attempts. Because each source/inner subscriber has its own queue and because of exclusion ensured by the Assuming now that flatmap may shuffle even the events emitted by one of the observables produced in map, I should be able to reproduce the problem without a zip (using I think there is a lack in the javadoc. The user of the API should be warned about the fact that Merge and operators composed from it (flatmap for instance) may shuffle the flow. I may propose an update of the doc if the behavior is confirmed. Finally I agree with @abersnaze about the fact that such a behavior is surprising and unexpected. |
It's a property of the queue bypass interacting with the backpressure logic. This happens when the coordinator runs out of requests. When the source emits, it goes to the queue. Then a request comes in (Thread A), increments the counter but at the same time, the source emits again (Thread B). Now the bypass sees a non-zero requested, emits the later immediately. Then either it or the requestor thread enters the drain loop and emits the first value from the queue. |
Ok, I get it. Thanks a lot for the explanation. Wouldn't it be interesting to try to drain the queue of the source at the begining of the scalar emission ? Of course this would be a slower path, but still a faster one than the whole emitLoop. |
That breaks the Rx contract. Ordering of a single stream should always be maintained. Interleaving via merge etc is between streams, not elements of the same stream. |
@benjchristensen then the operator The fix is trivial but adds a likely 10% overhead to the fast-path emissions. |
That operator is allowing a developer to consciously reorder events, so that's not breaking the contract, as it isn't the Observable itself changing order, but user code choosing to do so. Similar to how filter chooses to drop events. But filter can't reorder them for example. |
I agree completely that the reordering within an Observable behavior is not at all what I expect intuitively. If we do want to add the ability to arbitrarily change events, then we should definitely make it explicit rather than the default behavior. |
See #4209. |
Closing via #4209. Fix released in 1.1.8. |
Hello,
I am encountering an unexpected behavior in a piece of code I wrote. I've tried to narrow down a reproduction case in this gist.
To sum up :
What I observe :
initTestFlowWithOrderCheck
instead ofinitTestFlowWithZip
) that seems to discard this hypothesisHere I am with my investigation, I may have missed something obvious to someone else so I'm looking forward to have your feedback on that.
Thx
The text was updated successfully, but these errors were encountered: