diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7754f87c43..6e28c5d4fe 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -810,6 +810,31 @@ public static Observable combineLatest(List(sources, combineFunction)); } + /** + * Combines a collection of source Observables by emitting an item that aggregates the latest values of each of + * the source Observables each time an item is received from any of the source Observables, where this + * aggregation is defined by a specified function. + *
+ *
Scheduler:
+ *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the common base type of source values + * @param + * the result type + * @param sources + * the collection of source Observables + * @param combineFunction + * the aggregation function used to combine the items emitted by the source Observables + * @return an Observable that emits items that are the result of combining the items emitted by the source + * Observables by means of the given aggregation function + * @see ReactiveX operators documentation: CombineLatest + */ + public static Observable combineLatest(Iterable> sources, FuncN combineFunction) { + return create(new OnSubscribeCombineLatest(sources, combineFunction)); + } + /** * Returns an Observable that emits the items emitted by each of the Observables emitted by the source * Observable, one after the other, without interleaving them. diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index 5df99b2585..152a0831b0 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -1,317 +1,409 @@ /** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. */ + package rx.internal.operators; -import java.util.BitSet; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.*; +import java.util.concurrent.atomic.*; +import rx.*; import rx.Observable; import rx.Observable.OnSubscribe; -import rx.exceptions.*; -import rx.Producer; -import rx.Subscriber; +import rx.exceptions.CompositeException; import rx.functions.FuncN; import rx.internal.util.RxRingBuffer; +import rx.internal.util.atomic.SpscLinkedArrayQueue; +import rx.plugins.RxJavaPlugins; -/** - * Returns an Observable that combines the emissions of multiple source observables. Once each - * source Observable has emitted at least one item, combineLatest emits an item whenever any of - * the source Observables emits an item, by combining the latest emissions from each source - * Observable with a specified function. - *

- * - * - * @param - * the common basetype of the source values - * @param - * the result type of the combinator function - */ public final class OnSubscribeCombineLatest implements OnSubscribe { - final List> sources; - final FuncN combinator; - - public OnSubscribeCombineLatest(List> sources, FuncN combinator) { + final Observable[] sources; + final Iterable> sourcesIterable; + final FuncN combiner; + final int bufferSize; + final boolean delayError; + + public OnSubscribeCombineLatest(Iterable> sourcesIterable, + FuncN combiner) { + this(null, sourcesIterable, combiner, RxRingBuffer.SIZE, false); + } + + public OnSubscribeCombineLatest(Observable[] sources, + Iterable> sourcesIterable, + FuncN combiner, int bufferSize, + boolean delayError) { this.sources = sources; - this.combinator = combinator; - if (sources.size() > RxRingBuffer.SIZE) { - // For design simplicity this is limited to RxRingBuffer.SIZE. If more are really needed we'll need to - // adjust the design of how RxRingBuffer is used in the implementation below. - throw new IllegalArgumentException("More than RxRingBuffer.SIZE sources to combineLatest is not supported."); - } + this.sourcesIterable = sourcesIterable; + this.combiner = combiner; + this.bufferSize = bufferSize; + this.delayError = delayError; } + @Override - public void call(final Subscriber child) { - if (sources.isEmpty()) { - child.onCompleted(); - return; - } - if (sources.size() == 1) { - child.setProducer(new SingleSourceProducer(child, sources.get(0), combinator)); + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void call(Subscriber s) { + Observable[] sources = this.sources; + int count = 0; + if (sources == null) { + if (sourcesIterable instanceof List) { + // unchecked & raw: javac type inference problem otherwise + List list = (List)sourcesIterable; + sources = (Observable[])list.toArray(new Observable[list.size()]); + count = sources.length; + } else { + sources = new Observable[8]; + for (Observable p : sourcesIterable) { + if (count == sources.length) { + Observable[] b = new Observable[count + (count >> 2)]; + System.arraycopy(sources, 0, b, 0, count); + sources = b; + } + sources[count++] = p; + } + } } else { - child.setProducer(new MultiSourceProducer(child, sources, combinator)); + count = sources.length; } - + + if (count == 0) { + s.onCompleted(); + return; + } + + LatestCoordinator lc = new LatestCoordinator(s, combiner, count, bufferSize, delayError); + lc.subscribe(sources); } - - /* - * benjchristensen => This implementation uses a buffer enqueue/drain pattern. It could be optimized to have a fast-path to - * skip the buffer and emit directly when no conflict, but that is quite complicated and I don't have the time to attempt it right now. - */ - final static class MultiSourceProducer implements Producer { - private final AtomicBoolean started = new AtomicBoolean(); - private final AtomicLong requested = new AtomicLong(); - private final List> sources; - private final Subscriber child; - private final FuncN combinator; - private final MultiSourceRequestableSubscriber[] subscribers; - - /* following are guarded by WIP */ - private final RxRingBuffer buffer = RxRingBuffer.getSpmcInstance(); - private final Object[] collectedValues; - private final BitSet haveValues; - private volatile int haveValuesCount; // does this need to be volatile or is WIP sufficient? - private final BitSet completion; - private volatile int completionCount; // does this need to be volatile or is WIP sufficient? - - private final AtomicLong counter = new AtomicLong(); - + + static final class LatestCoordinator extends AtomicInteger implements Producer, Subscription { + /** */ + private static final long serialVersionUID = 8567835998786448817L; + final Subscriber actual; + final FuncN combiner; + final int count; + final CombinerSubscriber[] subscribers; + final int bufferSize; + final Object[] latest; + final SpscLinkedArrayQueue queue; + final boolean delayError; + + volatile boolean cancelled; + + volatile boolean done; + + final AtomicLong requested; + + final AtomicReference error; + + int active; + int complete; + + /** Indicates the particular source hasn't emitted any value yet. */ + static final Object MISSING = new Object(); + @SuppressWarnings("unchecked") - public MultiSourceProducer(final Subscriber child, final List> sources, FuncN combinator) { - this.sources = sources; - this.child = child; - this.combinator = combinator; - - int n = sources.size(); - this.subscribers = new MultiSourceRequestableSubscriber[n]; - this.collectedValues = new Object[n]; - this.haveValues = new BitSet(n); - this.completion = new BitSet(n); + public LatestCoordinator(Subscriber actual, + FuncN combiner, + int count, int bufferSize, boolean delayError) { + this.actual = actual; + this.combiner = combiner; + this.count = count; + this.bufferSize = bufferSize; + this.delayError = delayError; + this.latest = new Object[count]; + Arrays.fill(latest, MISSING); + this.subscribers = new CombinerSubscriber[count]; + this.queue = new SpscLinkedArrayQueue(bufferSize); + this.requested = new AtomicLong(); + this.error = new AtomicReference(); } - + + public void subscribe(Observable[] sources) { + Subscriber[] as = subscribers; + int len = as.length; + for (int i = 0; i < len; i++) { + as[i] = new CombinerSubscriber(this, i); + } + lazySet(0); // release array contents + actual.add(this); + actual.setProducer(this); + for (int i = 0; i < len; i++) { + if (cancelled) { + return; + } + sources[i].subscribe(as[i]); + } + } + @Override public void request(long n) { - BackpressureUtils.getAndAddRequest(requested, n); - if (!started.get() && started.compareAndSet(false, true)) { - /* - * NOTE: this logic will ONLY work if we don't have more sources than the size of the buffer. - * - * We would likely need to make an RxRingBuffer that can be sized to [numSources * n] instead - * of the current global default size it has. - */ - int sizePerSubscriber = RxRingBuffer.SIZE / sources.size(); - int leftOver = RxRingBuffer.SIZE % sources.size(); - for (int i = 0; i < sources.size(); i++) { - Observable o = sources.get(i); - int toRequest = sizePerSubscriber; - if (i == sources.size() - 1) { - toRequest += leftOver; - } - MultiSourceRequestableSubscriber s = new MultiSourceRequestableSubscriber(i, toRequest, child, this); - subscribers[i] = s; - o.unsafeSubscribe(s); + if (n < 0) { + throw new IllegalArgumentException("n >= required but it was " + n); + } + if (n != 0) { + BackpressureUtils.getAndAddRequest(requested, n); + drain(); + } + } + + @Override + public void unsubscribe() { + if (!cancelled) { + cancelled = true; + + if (getAndIncrement() == 0) { + cancel(queue); } } - tick(); } - + + @Override + public boolean isUnsubscribed() { + return cancelled; + } + + void cancel(Queue q) { + q.clear(); + for (CombinerSubscriber s : subscribers) { + s.unsubscribe(); + } + } + /** - * This will only allow one thread at a time to do the work, but ensures via `counter` increment/decrement - * that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn. + * Combine the given notification value from the indexth source with the existing known + * latest values. + * @param value the notification to combine, null indicates the source terminated normally + * @param index the index of the source subscriber */ - void tick() { - AtomicLong localCounter = this.counter; - if (localCounter.getAndIncrement() == 0) { - int emitted = 0; - do { - // we only emit if requested > 0 - if (requested.get() > 0) { - Object o = buffer.poll(); - if (o != null) { - if (buffer.isCompleted(o)) { - child.onCompleted(); - } else { - buffer.accept(o, child); - emitted++; - requested.decrementAndGet(); - } - } - } - } while (localCounter.decrementAndGet() > 0); - if (emitted > 0) { - for (MultiSourceRequestableSubscriber s : subscribers) { - s.requestUpTo(emitted); + void combine(Object value, int index) { + CombinerSubscriber combinerSubscriber = subscribers[index]; + + int activeCount; + int completedCount; + int sourceCount; + boolean empty; + boolean allSourcesFinished; + synchronized (this) { + sourceCount = latest.length; + Object o = latest[index]; + activeCount = active; + if (o == MISSING) { + active = ++activeCount; + } + completedCount = complete; + if (value == null) { + complete = ++completedCount; + } else { + latest[index] = combinerSubscriber.nl.getValue(value); + } + allSourcesFinished = activeCount == sourceCount; + // see if either all sources completed + empty = completedCount == sourceCount + || (value == null && o == MISSING); // or this source completed without any value + if (!empty) { + if (value != null && allSourcesFinished) { + queue.offer(combinerSubscriber, latest.clone()); + } else + if (value == null && error.get() != null) { + done = true; // if this source completed without a value } + } else { + done = true; } } + if (!allSourcesFinished && value != null) { + combinerSubscriber.requestMore(1); + return; + } + drain(); } - - public void onCompleted(int index, boolean hadValue) { - if (!hadValue) { - child.onCompleted(); + void drain() { + if (getAndIncrement() != 0) { return; } - boolean done = false; - synchronized (this) { - if (!completion.get(index)) { - completion.set(index); - completionCount++; - done = completionCount == collectedValues.length; + + final Queue q = queue; + final Subscriber a = actual; + final boolean delayError = this.delayError; + final AtomicLong localRequested = this.requested; + + int missed = 1; + for (;;) { + + if (checkTerminated(done, q.isEmpty(), a, q, delayError)) { + return; } - } - if (done) { - buffer.onCompleted(); - tick(); - } - } + + long requestAmount = localRequested.get(); + boolean unbounded = requestAmount == Long.MAX_VALUE; + long emitted = 0L; + + while (requestAmount != 0L) { + + boolean d = done; + @SuppressWarnings("unchecked") + CombinerSubscriber cs = (CombinerSubscriber)q.peek(); + boolean empty = cs == null; + + if (checkTerminated(d, empty, a, q, delayError)) { + return; + } + + if (empty) { + break; + } - /** - * @return boolean true if propagated value - */ - public boolean onNext(int index, T t) { - synchronized (this) { - if (!haveValues.get(index)) { - haveValues.set(index); - haveValuesCount++; + q.poll(); + Object[] array = (Object[])q.poll(); + + if (array == null) { + cancelled = true; + cancel(q); + a.onError(new IllegalStateException("Broken queue?! Sender received but not the array.")); + return; + } + + R v; + try { + v = combiner.call(array); + } catch (Throwable ex) { + cancelled = true; + cancel(q); + a.onError(ex); + return; + } + + a.onNext(v); + + cs.requestMore(1); + + requestAmount--; + emitted--; + } + + if (emitted != 0L) { + if (!unbounded) { + localRequested.addAndGet(emitted); + } } - collectedValues[index] = t; - if (haveValuesCount != collectedValues.length) { - // haven't received value from each source yet so won't emit - return false; + + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + + boolean checkTerminated(boolean mainDone, boolean queueEmpty, Subscriber childSubscriber, Queue q, boolean delayError) { + if (cancelled) { + cancel(q); + return true; + } + if (mainDone) { + if (delayError) { + if (queueEmpty) { + Throwable e = error.get(); + if (e != null) { + childSubscriber.onError(e); + } else { + childSubscriber.onCompleted(); + } + return true; + } } else { - try { - buffer.onNext(combinator.call(collectedValues)); - } catch (MissingBackpressureException e) { - onError(e); - } catch (Throwable e) { - Exceptions.throwOrReport(e, child); + Throwable e = error.get(); + if (e != null) { + cancel(q); + childSubscriber.onError(e); + return true; + } else + if (queueEmpty) { + childSubscriber.onCompleted(); + return true; } } } - tick(); - return true; + return false; } - - public void onError(Throwable e) { - child.onError(e); + + void onError(Throwable e) { + AtomicReference localError = this.error; + for (;;) { + Throwable curr = localError.get(); + Throwable next; + if (curr != null) { + if (curr instanceof CompositeException) { + CompositeException ce = (CompositeException) curr; + List es = new ArrayList(ce.getExceptions()); + es.add(e); + next = new CompositeException(es); + } else { + next = new CompositeException(Arrays.asList(curr, e)); + } + } else { + next = e; + } + if (localError.compareAndSet(curr, next)) { + return; + } + } } } - - final static class MultiSourceRequestableSubscriber extends Subscriber { - - final MultiSourceProducer producer; + + static final class CombinerSubscriber extends Subscriber { + final LatestCoordinator parent; final int index; - final AtomicLong emitted = new AtomicLong(); - boolean hasValue = false; - - public MultiSourceRequestableSubscriber(int index, int initial, Subscriber child, MultiSourceProducer producer) { - super(child); + final NotificationLite nl; + + boolean done; + + public CombinerSubscriber(LatestCoordinator parent, int index) { + this.parent = parent; this.index = index; - this.producer = producer; - request(initial); - } - - public void requestUpTo(long n) { - do { - long r = emitted.get(); - long u = Math.min(r, n); - if (emitted.compareAndSet(r, r - u)) { - request(u); - break; - } - } while (true); + this.nl = NotificationLite.instance(); + request(parent.bufferSize); } - - @Override - public void onCompleted() { - producer.onCompleted(index, hasValue); - } - - @Override - public void onError(Throwable e) { - producer.onError(e); - } - + @Override public void onNext(T t) { - hasValue = true; - emitted.incrementAndGet(); - boolean emitted = producer.onNext(index, t); - if (!emitted) { - request(1); + if (done) { + return; } + parent.combine(nl.next(t), index); } - - } - - final static class SingleSourceProducer implements Producer { - final AtomicBoolean started = new AtomicBoolean(); - final Observable source; - final Subscriber child; - final FuncN combinator; - final SingleSourceRequestableSubscriber subscriber; - - public SingleSourceProducer(final Subscriber child, Observable source, FuncN combinator) { - this.source = source; - this.child = child; - this.combinator = combinator; - this.subscriber = new SingleSourceRequestableSubscriber(child, combinator); - } - + @Override - public void request(final long n) { - subscriber.requestMore(n); - if (started.compareAndSet(false, true)) { - source.unsafeSubscribe(subscriber); + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + return; } - + parent.onError(t); + done = true; + parent.combine(null, index); } - - } - - final static class SingleSourceRequestableSubscriber extends Subscriber { - - private final Subscriber child; - private final FuncN combinator; - - SingleSourceRequestableSubscriber(Subscriber child, FuncN combinator) { - super(child); - this.child = child; - this.combinator = combinator; + + @Override + public void onCompleted() { + if (done) { + return; + } + done = true; + parent.combine(null, index); } - + public void requestMore(long n) { request(n); } - - @Override - public void onNext(T t) { - child.onNext(combinator.call(t)); - } - - @Override - public void onError(Throwable e) { - child.onError(e); - } - - @Override - public void onCompleted() { - child.onCompleted(); - } } } diff --git a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java index c28606cae0..a2b8b32763 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java @@ -16,40 +16,20 @@ package rx.internal.operators; import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import org.junit.Test; -import org.mockito.InOrder; -import org.mockito.Matchers; +import org.mockito.*; -import rx.Notification; +import rx.*; import rx.Observable; import rx.Observer; -import rx.Subscriber; -import rx.functions.Action1; -import rx.functions.Func2; -import rx.functions.Func3; -import rx.functions.Func4; -import rx.functions.Func5; -import rx.functions.Func6; -import rx.functions.Func7; -import rx.functions.Func8; -import rx.functions.Func9; -import rx.functions.FuncN; +import rx.functions.*; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -851,6 +831,7 @@ public Long call(Long t1, Integer t2) { @Test(timeout=10000) public void testCombineLatestRequestOverflow() throws InterruptedException { + @SuppressWarnings("unchecked") List> sources = Arrays.asList(Observable.from(Arrays.asList(1,2,3,4)), Observable.from(Arrays.asList(5,6,7,8))); Observable o = Observable.combineLatest(sources,new FuncN() { @Override @@ -884,4 +865,93 @@ public void onNext(Integer t) { assertTrue(latch.await(10, TimeUnit.SECONDS)); } + @Test + public void testCombineMany() { + int n = RxRingBuffer.SIZE * 3; + + List> sources = new ArrayList>(); + + StringBuilder expected = new StringBuilder(n * 2); + + for (int i = 0; i < n; i++) { + sources.add(Observable.just(i)); + expected.append(i); + } + + TestSubscriber ts = TestSubscriber.create(); + + Observable.combineLatest(sources, new FuncN() { + @Override + public String call(Object... args) { + StringBuilder b = new StringBuilder(); + for (Object o : args) { + b.append(o); + } + return b.toString(); + } + }).subscribe(ts); + + ts.assertNoErrors(); + ts.assertValue(expected.toString()); + ts.assertCompleted(); + } + + @Test + public void testCombineManyNulls() { + int n = RxRingBuffer.SIZE * 3; + + Observable source = Observable.just((Integer)null); + + List> sources = new ArrayList>(); + + for (int i = 0; i < n; i++) { + sources.add(source); + } + + TestSubscriber ts = TestSubscriber.create(); + + Observable.combineLatest(sources, new FuncN() { + @Override + public Integer call(Object... args) { + int sum = 0; + for (Object o : args) { + if (o == null) { + sum ++; + } + } + return sum; + } + }).subscribe(ts); + + ts.assertValue(n); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void testNonFatalExceptionThrownByCombinatorForSingleSourceIsNotReportedByUpstreamOperator() { + final AtomicBoolean errorOccurred = new AtomicBoolean(false); + TestSubscriber ts = TestSubscriber.create(1); + Observable source = Observable.just(1) + // if haven't caught exception in combineLatest operator then would incorrectly + // be picked up by this call to doOnError + .doOnError(new Action1() { + @Override + public void call(Throwable t) { + errorOccurred.set(true); + } + }); + Observable + .combineLatest(Collections.singletonList(source), THROW_NON_FATAL) + .subscribe(ts); + assertFalse(errorOccurred.get()); + } + + private static final FuncN THROW_NON_FATAL = new FuncN() { + @Override + public Integer call(Object... args) { + throw new RuntimeException(); + } + + }; }