-
Notifications
You must be signed in to change notification settings - Fork 7.6k
1.x: combineLatest can now combine arbitrary number of sources #3507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
d04fb0b
to
c726458
Compare
if (sources == null) { | ||
if (sourcesIterable instanceof List) { | ||
// unchecked & raw: javac type inference problem otherwise | ||
sources = (Observable[])(((List)sourcesIterable).toArray(EMPTY)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not toArray(new Observable[list.size()])
?
c726458
to
bd61bf5
Compare
@artem-zinnatullin I didn't bother but now I'm more sensitive to Android reflection issues. |
Rebasing... |
bd61bf5
to
584489a
Compare
... rebased. (Did I mention rebase doesn't really work with the latest Eclipse; there is no continue/abort menu option and the branch gets stuck and needs to be manually deleted as switching is also disabled). |
You can use IntelliJ IDEA community edition, it's free and awesome. On Wed, Nov 11, 2015, 13:30 David Karnok [email protected] wrote:
@artem_zin |
I've tried it a few times and it is too alien for me. I can't find features and its internal compiler plays tricks with me sometimes. |
we need this one, pls merge |
584489a
to
33845dc
Compare
public void testCombineMany() { | ||
int n = RxRingBuffer.SIZE * 3; | ||
|
||
Observable<Integer> source = Observable.just(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akarnokd this may play very bad "joke" with us because operator just needs to combine any result (first, second, etc) correct amount of times to pass the test!
Can you please create unique observables as a source? Probably even better with String
s to prevent effects from mathematical operations (equality of sum without fixed order in this case)
👍 from me (test needs a fix), @zsxwing can you please take a look? |
33845dc
to
dd409e2
Compare
Closing out, some unknown merge conflicts appeared, maybe due to age. |
See #3661 |
This PR is a backport of the 2.x
combineLatest
withnull
support and let's the programmer combine more than 128 sources at once. Note that 2.x supports manual sizing of the prefetch buffer and allows delaying the exceptions to the last moment; I kept these in case such features are required by 1.x some day.The original had this 128 limit due to using a single and non-parametric RxRingBuffer to store values from sources. In addition, the original algorithm divided downstream requests among the sources with some strange logic in order to keep them running and not overflow the buffer.
This PR uses an unbounded-linked SPSC queue which can now hold as many elements as there are sources times the default ring buffer size (this is not preallocated and grows/shrinks as necessary).
The algorithm stores the current latest array and the source subscriber in the queue in pairs so when that particular row is emitted to downstream, that source subscriber gets the request() replenishment call that generated the row.