diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 782d5c833f..1deaf04774 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -930,6 +930,110 @@ public static Observable merge(Single t1, Single + * + *

+ * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source + * an Observable of Singles to be merged + * @return an Observable that emits all of the items emitted by the source Singles + * @see ReactiveX operators documentation: Merge + */ + public static Observable merge(Observable> source) { + return source.lift(OperatorMergeSingle. instance(false)); + } + + /** + * Flattens any number of Singles into a single Observable, without any transformation. + *

+ * + *

+ * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source + * an Observable of Singles to be merged + * @param maxConcurrent + * the maximum number of Singles that may be subscribed to concurrently + * @return an Observable that emits all of the items emitted by the source Singles + * @see ReactiveX operators documentation: Merge + */ + public static Observable merge(Observable> source, int maxConcurrent) { + return source.lift(OperatorMergeSingle. instance(false, maxConcurrent)); + } + + /** + * Flattens any number of Singles into a single Observable, in a way that allows an Observer to + * receive all successfully emitted items from all of the source Singles without being interrupted by + * an error notification from one of them. + *

+ * This behaves like {@link #merge(Observable)} except that if any of the merged Singles notify of an + * error via {@link SingleSubscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that + * error notification until all of the merged Singles have finished emitting items. + *

+ * + *

+ * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source + * an Observable of Singles to be merged + * @return an Observable that emits all of the items emitted by the source Singles + * @see ReactiveX operators documentation: Merge + */ + public static Observable mergeDelayError(Observable> source) { + return source.lift(OperatorMergeSingle. instance(true)); + } + + /** + * Flattens any number of Singles into a single Observable, in a way that allows an Observer to + * receive all successfully emitted items from all of the source Singles without being interrupted by + * an error notification from one of them. + *

+ * This behaves like {@link #merge(Observable)} except that if any of the merged Singles notify of an + * error via {@link SingleSubscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that + * error notification until all of the merged Singles have finished emitting items. + *

+ * + *

+ * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common value type + * @param source + * an Observable of Singles to be merged + * @param maxConcurrent + * the maximum number of Singles that may be subscribed to concurrently + * @return an Observable that emits all of the items emitted by the source Singles + * @see ReactiveX operators documentation: Merge + */ + public static Observable mergeDelayError(Observable> source, int maxConcurrent) { + return source.lift(OperatorMergeSingle. instance(true, maxConcurrent)); + } + /** * Returns a Single that emits the results of a specified combiner function applied to two items emitted by * two other Singles. diff --git a/src/main/java/rx/internal/operators/OperatorMergeSingle.java b/src/main/java/rx/internal/operators/OperatorMergeSingle.java new file mode 100644 index 0000000000..bb2752239c --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorMergeSingle.java @@ -0,0 +1,858 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import rx.*; +import rx.Observable; +import rx.Observable.Operator; +import rx.exceptions.*; +import rx.internal.util.*; +import rx.internal.util.atomic.*; +import rx.internal.util.unsafe.*; +import rx.subscriptions.CompositeSubscription; + +/** + * Flattens a list of {@link Observable}s into one {@code Observable}, without any transformation. + *

+ * + *

+ * You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation. + *

+ * The {@code instance(true)} call behaves like {@link OperatorMerge} except that if any of the merged Observables notify of + * an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error + * notification until all of the merged Observables have finished emitting items. + *

+ * + *

+ * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will + * only invoke the {@code onError} method of its Observers once. + *

+ * This operation allows an Observer to receive all successfully emitted items from all of the + * source Observables without being interrupted by an error notification from one of them. + *

+ * Note: If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors. + + * @param + * the type of the items emitted by both the source and merged {@code Observable}s + */ +public final class OperatorMergeSingle implements Operator> { + final boolean delayErrors; + final int maxConcurrent; + + /** Lazy initialization via inner-class holder. */ + static final class HolderNoDelay { + /** A singleton instance. */ + static final OperatorMergeSingle INSTANCE = new OperatorMergeSingle(false, Integer.MAX_VALUE); + } + /** Lazy initialization via inner-class holder. */ + static final class HolderDelayErrors { + /** A singleton instance. */ + static final OperatorMergeSingle INSTANCE = new OperatorMergeSingle(true, Integer.MAX_VALUE); + } + /** + * @param the common value type + * @param delayErrors should the merge delay errors? + * @return a singleton instance of this stateless operator. + */ + @SuppressWarnings("unchecked") + public static OperatorMergeSingle instance(boolean delayErrors) { + if (delayErrors) { + return (OperatorMergeSingle)HolderDelayErrors.INSTANCE; + } + return (OperatorMergeSingle)HolderNoDelay.INSTANCE; + } + /** + * Creates a new instance of the operator with the given delayError and maxConcurrency settings. + * @param the value type + * @param delayErrors if true, errors are delayed till all sources terminate, if false the first error will + * be emitted and all sequences unsubscribed + * @param maxConcurrent the maximum number of concurrent subscriptions or Integer.MAX_VALUE for unlimited + * @return the Operator instance with the given settings + */ + public static OperatorMergeSingle instance(boolean delayErrors, int maxConcurrent) { + if (maxConcurrent <= 0) { + throw new IllegalArgumentException("maxConcurrent > 0 required but it was " + maxConcurrent); + } + if (maxConcurrent == Integer.MAX_VALUE) { + return instance(delayErrors); + } + return new OperatorMergeSingle(delayErrors, maxConcurrent); + } + + OperatorMergeSingle(boolean delayErrors, int maxConcurrent) { + this.delayErrors = delayErrors; + this.maxConcurrent = maxConcurrent; + } + + @Override + public Subscriber> call(final Subscriber child) { + MergeSubscriber subscriber = new MergeSubscriber(child, delayErrors, maxConcurrent); + MergeProducer producer = new MergeProducer(subscriber); + subscriber.producer = producer; + + child.add(subscriber); + child.setProducer(producer); + + return subscriber; + } + + static final class MergeProducer extends AtomicLong implements Producer { + /** */ + private static final long serialVersionUID = -1214379189873595503L; + + final MergeSubscriber subscriber; + + public MergeProducer(MergeSubscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + if (n > 0) { + if (get() == Long.MAX_VALUE) { + return; + } + BackpressureUtils.getAndAddRequest(this, n); + subscriber.emit(); + } else + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + } + public long produced(int n) { + return addAndGet(-n); + } + } + + /** + * The subscriber that observes Observables. + * @param the value type + */ + static final class MergeSubscriber extends Subscriber> { + final Subscriber child; + final boolean delayErrors; + final int maxConcurrent; + + MergeProducer producer; + + volatile Queue queue; + + /** Tracks the active subscriptions to sources. */ + volatile CompositeSubscription subscriptions; + /** Due to the emission loop, we need to store errors somewhere if !delayErrors. */ + volatile ConcurrentLinkedQueue errors; + + volatile boolean done; + + /** Guarded by this. */ + boolean emitting; + /** Guarded by this. */ + boolean missed; + + final Object innerGuard; + /** Copy-on-write array, guarded by innerGuard. */ + volatile InnerSubscriber[] innerSubscribers; + + /** Used to generate unique InnerSubscriber IDs. Modified from onNext only. */ + long uniqueId; + + /** Which was the last InnerSubscriber that emitted? Accessed if emitting == true. */ + long lastId; + /** What was its index in the innerSubscribers array? Accessed if emitting == true. */ + int lastIndex; + + /** An empty array to avoid creating new empty arrays in removeInner. */ + static final InnerSubscriber[] EMPTY = new InnerSubscriber[0]; + + final int scalarEmissionLimit; + + int scalarEmissionCount; + + public MergeSubscriber(Subscriber child, boolean delayErrors, int maxConcurrent) { + this.child = child; + this.delayErrors = delayErrors; + this.maxConcurrent = maxConcurrent; + this.innerGuard = new Object(); + this.innerSubscribers = EMPTY; + if (maxConcurrent == Integer.MAX_VALUE) { + scalarEmissionLimit = Integer.MAX_VALUE; + request(Long.MAX_VALUE); + } else { + scalarEmissionLimit = Math.max(1, maxConcurrent >> 1); + request(maxConcurrent); + } + } + + Queue getOrCreateErrorQueue() { + ConcurrentLinkedQueue q = errors; + if (q == null) { + synchronized (this) { + q = errors; + if (q == null) { + q = new ConcurrentLinkedQueue(); + errors = q; + } + } + } + return q; + } + CompositeSubscription getOrCreateComposite() { + CompositeSubscription c = subscriptions; + if (c == null) { + boolean shouldAdd = false; + synchronized (this) { + c = subscriptions; + if (c == null) { + c = new CompositeSubscription(); + subscriptions = c; + shouldAdd = true; + } + } + if (shouldAdd) { + add(c); + } + } + return c; + } + + @Override + public void onNext(Single t) { + if (t == null) { + return; + } + if (t instanceof ScalarSynchronousSingle) { + tryEmit(((ScalarSynchronousSingle)t).get()); + } else { + InnerSubscriber inner = new InnerSubscriber(this, uniqueId++); + addInner(inner); + t.unsafeSubscribe(inner); + emit(); + } + } + + private void reportError() { + List list = new ArrayList(errors); + if (list.size() == 1) { + child.onError(list.get(0)); + } else { + child.onError(new CompositeException(list)); + } + } + + @Override + public void onError(Throwable e) { + getOrCreateErrorQueue().offer(e); + done = true; + emit(); + } + @Override + public void onCompleted() { + done = true; + emit(); + } + + void addInner(InnerSubscriber inner) { + getOrCreateComposite().add(inner); + synchronized (innerGuard) { + InnerSubscriber[] a = innerSubscribers; + int n = a.length; + InnerSubscriber[] b = new InnerSubscriber[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + innerSubscribers = b; + } + } + void removeInner(InnerSubscriber inner) { + RxRingBuffer q = inner.queue; + if (q != null) { + q.release(); + } + // subscription is non-null here because the very first addInner will create it before + // this can be called + subscriptions.remove(inner); + synchronized (innerGuard) { + InnerSubscriber[] a = innerSubscribers; + int n = a.length; + int j = -1; + // locate the inner + for (int i = 0; i < n; i++) { + if (inner.equals(a[i])) { + j = i; + break; + } + } + if (j < 0) { + return; + } + if (n == 1) { + innerSubscribers = EMPTY; + return; + } + InnerSubscriber[] b = new InnerSubscriber[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + innerSubscribers = b; + } + } + + /** + * Tries to emit the value directly to the child if + * no concurrent emission is happening at the moment. + *

+ * Since the scalar-value queue optimization applies + * to both the main source and the inner subscribers, + * we handle things in a shared manner. + * + * @param subscriber the subscriber to one of the inner Observables running + * @param value the value that inner Observable produced + */ + void tryEmit(InnerSubscriber subscriber, T value) { + boolean success = false; + long r = producer.get(); + if (r != 0L) { + synchronized (this) { + // if nobody is emitting and child has available requests + r = producer.get(); + if (!emitting && r != 0L) { + emitting = true; + success = true; + } + } + } + if (success) { + RxRingBuffer subscriberQueue = subscriber.queue; + if (subscriberQueue == null || subscriberQueue.isEmpty()) { + emitScalar(subscriber, value, r); + } else { + queueScalar(subscriber, value); + emitLoop(); + } + } else { + queueScalar(subscriber, value); + emit(); + } + } + + protected void queueScalar(InnerSubscriber subscriber, T value) { + /* + * If the attempt to make a fast-path emission failed + * due to lack of requests or an ongoing emission, + * enqueue the value and try the slow emission path. + */ + RxRingBuffer q = subscriber.queue; + if (q == null) { + q = RxRingBuffer.getSpscInstance(); + subscriber.add(q); + subscriber.queue = q; + } + try { + q.onNext(NotificationLite.next(value)); + } catch (MissingBackpressureException ex) { + subscriber.unsubscribe(); + subscriber.onError(ex); + } catch (IllegalStateException ex) { + if (!subscriber.isUnsubscribed()) { + subscriber.unsubscribe(); + subscriber.onError(ex); + } + } + } + + protected void emitScalar(InnerSubscriber subscriber, T value, long r) { + boolean skipFinal = false; + try { + try { + child.onNext(value); + } catch (Throwable t) { + if (!delayErrors) { + Exceptions.throwIfFatal(t); + skipFinal = true; + subscriber.unsubscribe(); + subscriber.onError(t); + return; + } + getOrCreateErrorQueue().offer(t); + } + if (r != Long.MAX_VALUE) { + producer.produced(1); + } + subscriber.requestMore(1); + // check if some state changed while emitting + synchronized (this) { + skipFinal = true; + if (!missed) { + emitting = false; + return; + } + missed = false; + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + /* + * In the synchronized block below request(1) we check + * if there was a concurrent emission attempt and if there was, + * we stay in emission mode and enter the emission loop + * which will take care all the queued up state and + * emission possibilities. + */ + emitLoop(); + } + + public void requestMore(long n) { + request(n); + } + + /** + * Tries to emit the value directly to the child if + * no concurrent emission is happening at the moment. + *

+ * Since the scalar-value queue optimization applies + * to both the main source and the inner subscribers, + * we handle things in a shared manner. + * + * @param value the scalar value the main Observable emitted through {@code just()} + */ + void tryEmit(T value) { + boolean success = false; + long r = producer.get(); + if (r != 0L) { + synchronized (this) { + // if nobody is emitting and child has available requests + r = producer.get(); + if (!emitting && r != 0L) { + emitting = true; + success = true; + } + } + } + if (success) { + Queue mainQueue = queue; + if (mainQueue == null || mainQueue.isEmpty()) { + emitScalar(value, r); + } else { + queueScalar(value); + emitLoop(); + } + } else { + queueScalar(value); + emit(); + } + } + + protected void queueScalar(T value) { + /* + * If the attempt to make a fast-path emission failed + * due to lack of requests or an ongoing emission, + * enqueue the value and try the slow emission path. + */ + Queue q = this.queue; + if (q == null) { + int mc = maxConcurrent; + if (mc == Integer.MAX_VALUE) { + q = new SpscUnboundedAtomicArrayQueue(RxRingBuffer.SIZE); + } else { + if (Pow2.isPowerOfTwo(mc)) { + if (UnsafeAccess.isUnsafeAvailable()) { + q = new SpscArrayQueue(mc); + } else { + q = new SpscAtomicArrayQueue(mc); + } + } else { + q = new SpscExactAtomicArrayQueue(mc); + } + } + this.queue = q; + } + if (!q.offer(NotificationLite.next(value))) { + unsubscribe(); + onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value)); + } + } + + protected void emitScalar(T value, long r) { + boolean skipFinal = false; + try { + try { + child.onNext(value); + } catch (Throwable t) { + if (!delayErrors) { + Exceptions.throwIfFatal(t); + skipFinal = true; + this.unsubscribe(); + this.onError(t); + return; + } + getOrCreateErrorQueue().offer(t); + } + if (r != Long.MAX_VALUE) { + producer.produced(1); + } + + int produced = scalarEmissionCount + 1; + if (produced == scalarEmissionLimit) { + scalarEmissionCount = 0; + this.requestMore(produced); + } else { + scalarEmissionCount = produced; + } + + // check if some state changed while emitting + synchronized (this) { + skipFinal = true; + if (!missed) { + emitting = false; + return; + } + missed = false; + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + /* + * In the synchronized block below request(1) we check + * if there was a concurrent emission attempt and if there was, + * we stay in emission mode and enter the emission loop + * which will take care all the queued up state and + * emission possibilities. + */ + emitLoop(); + } + + void emit() { + synchronized (this) { + if (emitting) { + missed = true; + return; + } + emitting = true; + } + emitLoop(); + } + /** + * The standard emission loop serializing events and requests. + */ + void emitLoop() { + boolean skipFinal = false; + try { + final Subscriber child = this.child; + for (;;) { + // eagerly check if child unsubscribed or we reached a terminal state. + if (checkTerminate()) { + skipFinal = true; + return; + } + Queue svq = queue; + + long r = producer.get(); + boolean unbounded = r == Long.MAX_VALUE; + + // count the number of 'completed' sources to replenish them in batches + int replenishMain = 0; + + // try emitting as many scalars as possible + if (svq != null) { + for (;;) { + int scalarEmission = 0; + Object o = null; + while (r > 0) { + o = svq.poll(); + // eagerly check if child unsubscribed or we reached a terminal state. + if (checkTerminate()) { + skipFinal = true; + return; + } + if (o == null) { + break; + } + T v = NotificationLite.getValue(o); + // if child throws, report bounce it back immediately + try { + child.onNext(v); + } catch (Throwable t) { + if (!delayErrors) { + Exceptions.throwIfFatal(t); + skipFinal = true; + unsubscribe(); + child.onError(t); + return; + } + getOrCreateErrorQueue().offer(t); + } + replenishMain++; + scalarEmission++; + r--; + } + if (scalarEmission > 0) { + if (unbounded) { + r = Long.MAX_VALUE; + } else { + r = producer.produced(scalarEmission); + } + } + if (r == 0L || o == null) { + break; + } + } + } + + /* + * We need to read done before innerSubscribers because innerSubscribers are added + * before done is set to true. If it were the other way around, we could read an empty + * innerSubscribers, get paused and then read a done flag but an async producer + * might have added more subscribers between the two. + */ + boolean d = done; + // re-read svq because it could have been created + // asynchronously just before done was set to true. + svq = queue; + // read the current set of inner subscribers + InnerSubscriber[] inner = innerSubscribers; + int n = inner.length; + + // check if upstream is done, there are no scalar values + // and no active inner subscriptions + if (d && (svq == null || svq.isEmpty()) && n == 0) { + Queue e = errors; + if (e == null || e.isEmpty()) { + child.onCompleted(); + } else { + reportError(); + } + skipFinal = true; + return; + } + + boolean innerCompleted = false; + if (n > 0) { + // let's continue the round-robin emission from last location + long startId = lastId; + int index = lastIndex; + + // in case there were changes in the array or the index + // no longer points to the inner with the cached id + if (n <= index || inner[index].id != startId) { + if (n <= index) { + index = 0; + } + // try locating the inner with the cached index + int j = index; + for (int i = 0; i < n; i++) { + if (inner[j].id == startId) { + break; + } + // wrap around in round-robin fashion + j++; + if (j == n) { + j = 0; + } + } + // if we found it again, j will point to it + // otherwise, we continue with the replacement at j + index = j; + lastIndex = j; + lastId = inner[j].id; + } + + int j = index; + // loop through all sources once to avoid delaying any new sources too much + for (int i = 0; i < n; i++) { + // eagerly check if child unsubscribed or we reached a terminal state. + if (checkTerminate()) { + skipFinal = true; + return; + } + @SuppressWarnings("unchecked") + InnerSubscriber is = (InnerSubscriber)inner[j]; + + Object o = null; + for (;;) { + int produced = 0; + while (r > 0) { + // eagerly check if child unsubscribed or we reached a terminal state. + if (checkTerminate()) { + skipFinal = true; + return; + } + RxRingBuffer q = is.queue; + if (q == null) { + break; + } + o = q.poll(); + if (o == null) { + break; + } + T v = NotificationLite.getValue(o); + // if child throws, report bounce it back immediately + try { + child.onNext(v); + } catch (Throwable t) { + skipFinal = true; + Exceptions.throwIfFatal(t); + try { + child.onError(t); + } finally { + unsubscribe(); + } + return; + } + r--; + produced++; + } + if (produced > 0) { + if (!unbounded) { + r = producer.produced(produced); + } else { + r = Long.MAX_VALUE; + } + is.requestMore(produced); + } + // if we run out of requests or queued values, break + if (r == 0 || o == null) { + break; + } + } + boolean innerDone = is.done; + RxRingBuffer innerQueue = is.queue; + if (innerDone && (innerQueue == null || innerQueue.isEmpty())) { + removeInner(is); + if (checkTerminate()) { + skipFinal = true; + return; + } + replenishMain++; + innerCompleted = true; + } + // if we run out of requests, don't try the other sources + if (r == 0) { + break; + } + + // wrap around in round-robin fashion + j++; + if (j == n) { + j = 0; + } + } + // if we run out of requests or just completed a round, save the index and id + lastIndex = j; + lastId = inner[j].id; + } + + if (replenishMain > 0) { + request(replenishMain); + } + // if one or more inner completed, loop again to see if we can terminate the whole stream + if (innerCompleted) { + continue; + } + // in case there were updates to the state, we loop again + synchronized (this) { + if (!missed) { + skipFinal = true; + emitting = false; + break; + } + missed = false; + } + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + + /** + * Check if the operator reached some terminal state: child unsubscribed, + * an error was reported and we don't delay errors. + * @return true if the child unsubscribed or there are errors available and merge doesn't delay errors. + */ + boolean checkTerminate() { + if (child.isUnsubscribed()) { + return true; + } + Queue e = errors; + if (!delayErrors && (e != null && !e.isEmpty())) { + try { + reportError(); + } finally { + unsubscribe(); + } + return true; + } + return false; + } + } + static final class InnerSubscriber extends Subscriber { + final MergeSubscriber parent; + final long id; + volatile boolean done; + volatile RxRingBuffer queue; + int outstanding; + static final int LIMIT = RxRingBuffer.SIZE / 4; + + public InnerSubscriber(MergeSubscriber parent, long id) { + this.parent = parent; + this.id = id; + } + @Override + public void onStart() { + outstanding = RxRingBuffer.SIZE; + request(RxRingBuffer.SIZE); + } + @Override + public void onNext(T t) { + parent.tryEmit(this, t); + } + @Override + public void onError(Throwable e) { + done = true; + parent.getOrCreateErrorQueue().offer(e); + parent.emit(); + } + @Override + public void onCompleted() { + done = true; + parent.emit(); + } + public void requestMore(long n) { + int r = outstanding - (int)n; + if (r > LIMIT) { + outstanding = r; + return; + } + outstanding = RxRingBuffer.SIZE; + int k = RxRingBuffer.SIZE - r; + if (k > 0) { + request(k); + } + } + }} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorMergeSingleTest.java b/src/test/java/rx/internal/operators/OperatorMergeSingleTest.java new file mode 100644 index 0000000000..c7d6294c23 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorMergeSingleTest.java @@ -0,0 +1,286 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +import java.util.concurrent.*; + +import org.junit.*; +import org.mockito.*; + +import rx.*; +import rx.Observable; +import rx.Observer; + +public class OperatorMergeSingleTest { + + @Mock + Observer stringObserver; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testMergeOfSyncSync() { + final Single o1 = Single.create(new TestSynchronousSingle()); + final Single o2 = Single.create(new TestSynchronousSingle()); + + Observable> observableOfObservables = Observable.just(o1, o2); + Observable m = Single.merge(observableOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, never()).onError(any(Throwable.class)); + verify(stringObserver, times(1)).onCompleted(); + verify(stringObserver, times(2)).onNext("hello"); + } + + @Test + public void testMergeOfSyncSErr() { + final Single o1 = Single.create(new TestSynchronousSingle()); + final Single o2 = Single.create(new TestSynchronousErrorSingle()); + + Observable> observableOfObservables = Observable.just(o1, o2); + Observable m = Single.merge(observableOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, times(1)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(1)).onNext("hello"); + } + + @Test + public void testMergeOfAsyncSErr() throws Exception { + CyclicBarrier b = new CyclicBarrier(2); + CountDownLatch l = new CountDownLatch(1); + final Single o1 = Single.create(new TestAsynchronousSingle(b, l)); + final Single o2 = Single.create(new TestSynchronousErrorSingle()); + + Observable> observableOfObservables = Observable.just(o1, o2); + Observable m = Single.merge(observableOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, times(1)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(0)).onNext("hello"); + + // unblock the async + b.await(); + l.await(); + + // nothing should have changed. + verify(stringObserver, times(1)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(0)).onNext("hello"); + } + + @Test + public void testMergeOfAsyncAsync() throws Exception { + CyclicBarrier b = new CyclicBarrier(3); + CountDownLatch l = new CountDownLatch(2); + final Single o1 = Single.create(new TestAsynchronousSingle(b, l)); + final Single o2 = Single.create(new TestAsynchronousSingle(b, l)); + + Observable> observableOfObservables = Observable.just(o1, o2); + Observable m = Single.merge(observableOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, times(0)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(0)).onNext("hello"); + + b.await(); + l.await(); + + verify(stringObserver, times(0)).onError(any(Throwable.class)); + verify(stringObserver, times(1)).onCompleted(); + verify(stringObserver, times(2)).onNext("hello"); + } + + @Test + public void testMergeOfAsyncAErr() throws Exception { + CyclicBarrier bs = new CyclicBarrier(2); + CyclicBarrier be = new CyclicBarrier(2); + CountDownLatch ls = new CountDownLatch(1); + CountDownLatch le = new CountDownLatch(1); + final Single o1 = Single.create(new TestAsynchronousSingle(bs, ls)); + final Single o2 = Single.create(new TestAsynchronousErrorSingle(be, le)); + + Observable> observableOfObservables = Observable.just(o1, o2); + Observable m = Single.merge(observableOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, times(0)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(0)).onNext("hello"); + + // unblock the error + be.await(); + le.await(); + + verify(stringObserver, times(1)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(0)).onNext("hello"); + + // unblock the success + bs.await(); + ls.await(); + + verify(stringObserver, times(1)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(0)).onNext("hello"); + } + + @Test + public void testMergeOfAErroAsync() throws Exception { + CyclicBarrier bs = new CyclicBarrier(2); + CyclicBarrier be = new CyclicBarrier(2); + CountDownLatch ls = new CountDownLatch(1); + CountDownLatch le = new CountDownLatch(1); + final Single o1 = Single.create(new TestAsynchronousSingle(bs, ls)); + final Single o2 = Single.create(new TestAsynchronousErrorSingle(be, le)); + + Observable> observableOfObservables = Observable.just(o1, o2); + Observable m = Single.merge(observableOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, times(0)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(0)).onNext("hello"); + + // unblock the success + bs.await(); + ls.await(); + + verify(stringObserver, times(0)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(1)).onNext("hello"); + + // unblock the error + be.await(); + le.await(); + + verify(stringObserver, times(1)).onError(any(Throwable.class)); + verify(stringObserver, times(0)).onCompleted(); + verify(stringObserver, times(1)).onNext("hello"); + } + + @Test + public void testMergeOfSyncNull() throws InterruptedException { + final Single o1 = Single.create(new TestSynchronousSingle()); + + Observable> observableOfObservables = Observable.just(o1, null); + Observable m = Single.merge(observableOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, times(0)).onError(any(Throwable.class)); + verify(stringObserver, times(1)).onCompleted(); + verify(stringObserver, times(1)).onNext("hello"); + } + + @Test + public void testMergeOfJustJust() { + final Single o1 = Single.just("hello"); + final Single o2 = Single.just("hello"); + + Observable> observableOfObservables = Observable.just(o1, o2); + Observable m = Single.merge(observableOfObservables); + m.subscribe(stringObserver); + + verify(stringObserver, times(0)).onError(any(Throwable.class)); + verify(stringObserver, times(1)).onCompleted(); + verify(stringObserver, times(2)).onNext("hello"); + } + + private static class TestSynchronousSingle implements Single.OnSubscribe { + @Override + public void call(SingleSubscriber observer) { + observer.onSuccess("hello"); + } + } + + private static class TestAsynchronousSingle implements Single.OnSubscribe { + Thread t; + final CountDownLatch sent; + final CyclicBarrier barrier; + + public TestAsynchronousSingle(CyclicBarrier b, CountDownLatch l) { + this.barrier = b; + sent = l; + } + + @Override + public void call(final SingleSubscriber observer) { + t = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (Exception e) { + observer.onError(e); + } + try { + observer.onSuccess("hello"); + sent.countDown(); + // I can't use a countDownLatch to prove we are actually sending 'onNext' + // since it will block if synchronized and I'll deadlock + } catch (Exception e) { + observer.onError(e); + } + } + }); + t.start(); + } + } + + private static class TestSynchronousErrorSingle implements Single.OnSubscribe { + @Override + public void call(SingleSubscriber observer) { + observer.onError(new NullPointerException()); + } + } + + private static class TestAsynchronousErrorSingle implements Single.OnSubscribe { + Thread t; + final CountDownLatch onErrorSent; + final CyclicBarrier barrier; + + public TestAsynchronousErrorSingle(CyclicBarrier b, CountDownLatch l) { + barrier = b; + onErrorSent = l; + } + + @Override + public void call(final SingleSubscriber observer) { + t = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (Exception e) { + observer.onError(e); + } + observer.onError(new NullPointerException()); + onErrorSent.countDown(); + } + }); + t.start(); + } + } +}