diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index c576ea59f9..d3f21929b3 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -1693,7 +1693,11 @@ public final static Observable merge(IterableReactiveX operators documentation: Merge */ + @SuppressWarnings({"unchecked", "rawtypes"}) public final static Observable merge(Observable> source) { + if (source.getClass() == ScalarSynchronousObservable.class) { + return ((ScalarSynchronousObservable)source).scalarFlatMap((Func1)UtilityFunctions.identity()); + } return source.lift(OperatorMerge.instance(false)); } @@ -1721,8 +1725,13 @@ public final static Observable merge(ObservableReactiveX operators documentation: Merge */ + @Experimental + @SuppressWarnings({"unchecked", "rawtypes"}) public final static Observable merge(Observable> source, int maxConcurrent) { - return source.lift(new OperatorMergeMaxConcurrent(maxConcurrent)); + if (source.getClass() == ScalarSynchronousObservable.class) { + return ((ScalarSynchronousObservable)source).scalarFlatMap((Func1)UtilityFunctions.identity()); + } + return source.lift(OperatorMerge.instance(false, maxConcurrent)); } /** @@ -1993,7 +2002,31 @@ public final static Observable merge(Observable t1, Observab public final static Observable merge(Observable[] sequences) { return merge(from(sequences)); } - + + /** + * Flattens an Array of Observables into one Observable, without any transformation, while limiting the + * number of concurrent subscriptions to these Observables. + *

+ * + *

+ * You can combine items emitted by multiple Observables so that they appear as a single Observable, by + * using the {@code merge} method. + *

+ *
Scheduler:
+ *
{@code merge} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param sequences + * the Array of Observables + * @param maxConcurrent + * the maximum number of Observables that may be subscribed to concurrently + * @return an Observable that emits all of the items emitted by the Observables in the Array + * @see ReactiveX operators documentation: Merge + */ + @Experimental + public final static Observable merge(Observable[] sequences, int maxConcurrent) { + return merge(from(sequences), maxConcurrent); + } /** * Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to * receive all successfully emitted items from all of the source Observables without being interrupted by @@ -2021,6 +2054,37 @@ public final static Observable merge(Observable[] sequences) public final static Observable mergeDelayError(Observable> source) { return source.lift(OperatorMerge.instance(true)); } + /** + * Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to + * receive all successfully emitted items from all of the source Observables without being interrupted by + * an error notification from one of them, while limiting the + * number of concurrent subscriptions to these Observables. + *

+ * This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an + * error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that + * error notification until all of the merged Observables have finished emitting items. + *

+ * + *

+ * Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only + * invoke the {@code onError} method of its Observers once. + *

+ *
Scheduler:
+ *
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param source + * an Observable that emits Observables + * @param maxConcurrent + * the maximum number of Observables that may be subscribed to concurrently + * @return an Observable that emits all of the items emitted by the Observables emitted by the + * {@code source} Observable + * @see ReactiveX operators documentation: Merge + */ + @Experimental + public final static Observable mergeDelayError(Observable> source, int maxConcurrent) { + return source.lift(OperatorMerge.instance(true, maxConcurrent)); + } /** * Flattens two Observables into one Observable, in a way that allows an Observer to receive all @@ -4618,6 +4682,9 @@ public final Observable firstOrDefault(T defaultValue, Func1ReactiveX operators documentation: FlatMap */ public final Observable flatMap(Func1> func) { + if (getClass() == ScalarSynchronousObservable.class) { + return ((ScalarSynchronousObservable)this).scalarFlatMap(func); + } return merge(map(func)); } @@ -4646,6 +4713,9 @@ public final Observable flatMap(Func1 Observable flatMap(Func1> func, int maxConcurrent) { + if (getClass() == ScalarSynchronousObservable.class) { + return ((ScalarSynchronousObservable)this).scalarFlatMap(func); + } return merge(map(func), maxConcurrent); } diff --git a/src/main/java/rx/internal/operators/OperatorAll.java b/src/main/java/rx/internal/operators/OperatorAll.java index 3f78eeff88..96f0429d01 100644 --- a/src/main/java/rx/internal/operators/OperatorAll.java +++ b/src/main/java/rx/internal/operators/OperatorAll.java @@ -77,4 +77,4 @@ public void onCompleted() { child.setProducer(producer); return s; } -} +} \ No newline at end of file diff --git a/src/main/java/rx/internal/operators/OperatorMapNotification.java b/src/main/java/rx/internal/operators/OperatorMapNotification.java index 4a2fd03a36..be363663fb 100644 --- a/src/main/java/rx/internal/operators/OperatorMapNotification.java +++ b/src/main/java/rx/internal/operators/OperatorMapNotification.java @@ -15,11 +15,20 @@ */ package rx.internal.operators; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + import rx.Observable.Operator; +import rx.Producer; import rx.Subscriber; +import rx.Subscription; +import rx.exceptions.MissingBackpressureException; import rx.exceptions.OnErrorThrowable; import rx.functions.Func0; import rx.functions.Func1; +import rx.internal.util.unsafe.SpscArrayQueue; +import rx.internal.util.unsafe.UnsafeAccess; /** * Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of @@ -41,13 +50,18 @@ public OperatorMapNotification(Func1 onNext, Func1 call(final Subscriber o) { - return new Subscriber(o) { - + Subscriber subscriber = new Subscriber() { + SingleEmitter emitter; + @Override + public void setProducer(Producer producer) { + emitter = new SingleEmitter(o, producer, this); + o.setProducer(emitter); + } + @Override public void onCompleted() { try { - o.onNext(onCompleted.call()); - o.onCompleted(); + emitter.offerAndComplete(onCompleted.call()); } catch (Throwable e) { o.onError(e); } @@ -56,8 +70,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { try { - o.onNext(onError.call(e)); - o.onCompleted(); + emitter.offerAndComplete(onError.call(e)); } catch (Throwable e2) { o.onError(e); } @@ -66,13 +79,159 @@ public void onError(Throwable e) { @Override public void onNext(T t) { try { - o.onNext(onNext.call(t)); + emitter.offer(onNext.call(t)); } catch (Throwable e) { o.onError(OnErrorThrowable.addValueAsLastCause(e, t)); } } }; + o.add(subscriber); + return subscriber; } - -} + static final class SingleEmitter extends AtomicLong implements Producer, Subscription { + /** */ + private static final long serialVersionUID = -249869671366010660L; + final NotificationLite nl; + final Subscriber child; + final Producer producer; + final Subscription cancel; + final Queue queue; + volatile boolean complete; + /** Guarded by this. */ + boolean emitting; + /** Guarded by this. */ + boolean missed; + + public SingleEmitter(Subscriber child, Producer producer, Subscription cancel) { + this.child = child; + this.producer = producer; + this.cancel = cancel; + this.queue = UnsafeAccess.isUnsafeAvailable() + ? new SpscArrayQueue(2) + : new ConcurrentLinkedQueue(); + + this.nl = NotificationLite.instance(); + } + @Override + public void request(long n) { + for (;;) { + long r = get(); + if (r < 0) { + return; + } + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + if (compareAndSet(r, u)) { + producer.request(n); + drain(); + return; + } + } + } + + void produced(long n) { + for (;;) { + long r = get(); + if (r < 0) { + return; + } + long u = r - n; + if (u < 0) { + throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")"); + } + if (compareAndSet(r, u)) { + return; + } + } + } + + public void offer(T value) { + if (!queue.offer(value)) { + child.onError(new MissingBackpressureException()); + unsubscribe(); + } else { + drain(); + } + } + public void offerAndComplete(T value) { + if (!this.queue.offer(value)) { + child.onError(new MissingBackpressureException()); + unsubscribe(); + } else { + this.complete = true; + drain(); + } + } + + void drain() { + synchronized (this) { + if (emitting) { + missed = true; + return; + } + emitting = true; + missed = false; + } + boolean skipFinal = false; + try { + for (;;) { + + long r = get(); + boolean c = complete; + boolean empty = queue.isEmpty(); + + if (c && empty) { + child.onCompleted(); + skipFinal = true; + return; + } else + if (r > 0) { + Object v = queue.poll(); + if (v != null) { + child.onNext(nl.getValue(v)); + produced(1); + } else + if (c) { + child.onCompleted(); + skipFinal = true; + return; + } + } + + synchronized (this) { + if (!missed) { + skipFinal = true; + emitting = false; + return; + } + missed = false; + } + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + + @Override + public boolean isUnsubscribed() { + return get() < 0; + } + @Override + public void unsubscribe() { + long r = get(); + if (r != Long.MIN_VALUE) { + r = getAndSet(Long.MIN_VALUE); + if (r != Long.MIN_VALUE) { + cancel.unsubscribe(); + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 2da1844ca9..98cb548391 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -15,15 +15,16 @@ */ package rx.internal.operators; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.*; import rx.*; import rx.Observable.Operator; +import rx.Observable; import rx.exceptions.*; -import rx.functions.Func1; import rx.internal.util.*; +import rx.subscriptions.CompositeSubscription; /** * Flattens a list of {@link Observable}s into one {@code Observable}, without any transformation. @@ -49,16 +50,16 @@ * @param * the type of the items emitted by both the source and merged {@code Observable}s */ -public class OperatorMerge implements Operator> { +public final class OperatorMerge implements Operator> { /** Lazy initialization via inner-class holder. */ private static final class HolderNoDelay { /** A singleton instance. */ - static final OperatorMerge INSTANCE = new OperatorMerge(false); + static final OperatorMerge INSTANCE = new OperatorMerge(false, Integer.MAX_VALUE); } /** Lazy initialization via inner-class holder. */ private static final class HolderDelayErrors { /** A singleton instance. */ - static final OperatorMerge INSTANCE = new OperatorMerge(true); + static final OperatorMerge INSTANCE = new OperatorMerge(true, Integer.MAX_VALUE); } /** * @param delayErrors should the merge delay errors? @@ -71,716 +72,754 @@ public static OperatorMerge instance(boolean delayErrors) { } return (OperatorMerge)HolderNoDelay.INSTANCE; } - /* - * benjchristensen => This class is complex and I'm not a fan of it despite writing it. I want to give some background - * as to why for anyone who wants to try and help improve it. - * - * One of my first implementations that added backpressure support (Producer.request) was fairly elegant and used a simple - * queue draining approach. It was simple to understand as all onNext were added to their queues, then a single winner - * would drain the queues, similar to observeOn. It killed the Netflix API when I canaried it. There were two problems: - * (1) performance and (2) object allocation overhead causing massive GC pressure. Remember that merge is one of the most - * used operators (mostly due to flatmap) and is therefore critical to and a limiter of performance in any application. - * - * All subsequent work on this class and the various fast-paths and branches within it have been to achieve the needed functionality - * while reducing or eliminating object allocation and keeping performance acceptable. - * - * This has meant adopting strategies such as: - * - * - ring buffers instead of growable queues - * - object pooling - * - skipping request logic when downstream does not need backpressure - * - ScalarValueQueue for optimizing synchronous single-value Observables - * - adopting data structures that use Unsafe (and gating them based on environment so non-Oracle JVMs still work) - * - * It has definitely increased the complexity and maintenance cost of this class, but the performance gains have been significant. - * - * The biggest cost of the increased complexity is concurrency bugs and reasoning through what's going on. - * - * I'd love to have contributions that improve this class, but keep in mind the performance and GC pressure. - * The benchmarks I use are in the JMH OperatorMergePerf class. GC memory pressure is tested using Java Flight Recorder - * to track object allocation. + /** + * Creates a new instance of the operator with the given delayError and maxConcurrency settings. + * @param delayErrors + * @param maxConcurrent the maximum number of concurrent subscriptions or Integer.MAX_VALUE for unlimited + * @return */ - - private OperatorMerge() { - this.delayErrors = false; + public static OperatorMerge instance(boolean delayErrors, int maxConcurrent) { + if (maxConcurrent == Integer.MAX_VALUE) { + return instance(delayErrors); + } + return new OperatorMerge(delayErrors, maxConcurrent); } - private OperatorMerge(boolean delayErrors) { + final boolean delayErrors; + final int maxConcurrent; + + private OperatorMerge(boolean delayErrors, int maxConcurrent) { this.delayErrors = delayErrors; + this.maxConcurrent = maxConcurrent; } - private final boolean delayErrors; - @Override public Subscriber> call(final Subscriber child) { - return new MergeSubscriber(child, delayErrors); - + MergeSubscriber subscriber = new MergeSubscriber(child, delayErrors, maxConcurrent); + MergeProducer producer = new MergeProducer(subscriber); + subscriber.producer = producer; + + child.add(subscriber); + child.setProducer(producer); + + return subscriber; } - private static final class MergeSubscriber extends Subscriber> { - final NotificationLite on = NotificationLite.instance(); - final Subscriber actual; - private final MergeProducer mergeProducer; - private int wip; - private boolean completed; - private final boolean delayErrors; - private ConcurrentLinkedQueue exceptions; - - private volatile SubscriptionIndexedRingBuffer> childrenSubscribers; - - private volatile RxRingBuffer scalarValueQueue = null; - - /* protected by lock on MergeSubscriber instance */ - private int missedEmitting = 0; - private boolean emitLock = false; - - /** - * Using synchronized(this) for `emitLock` instead of ReentrantLock or AtomicInteger is faster when there is no contention. - * - *
 {@code
-         * Using ReentrantLock:
-         * r.o.OperatorMergePerf.merge1SyncStreamOfN           1000  thrpt         5    44185.294     1295.565    ops/s
-         * 
-         * Using synchronized(this):
-         * r.o.OperatorMergePerf.merge1SyncStreamOfN           1000  thrpt         5    79715.981     3704.486    ops/s
-         * 
-         * Still slower though than allowing concurrency:
-         * r.o.OperatorMergePerf.merge1SyncStreamOfN           1000  thrpt         5   149331.046     4851.290    ops/s
-         * } 
- */ - - public MergeSubscriber(Subscriber actual, boolean delayErrors) { - super(actual); - this.actual = actual; - this.mergeProducer = new MergeProducer(this); - this.delayErrors = delayErrors; - // decoupled the subscription chain because we need to decouple and control backpressure - actual.add(this); - actual.setProducer(mergeProducer); - } + static final class MergeProducer extends AtomicLong implements Producer { + /** */ + private static final long serialVersionUID = -1214379189873595503L; - @Override - public void onStart() { - // we request backpressure so we can handle long-running Observables that are enqueueing, such as flatMap use cases - // we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual) - request(RxRingBuffer.SIZE); + final MergeSubscriber subscriber; + + public MergeProducer(MergeSubscriber subscriber) { + this.subscriber = subscriber; } - - /* - * This is expected to be executed sequentially as per the Rx contract or it will not work. - */ + @Override - public void onNext(Observable t) { - if (t instanceof ScalarSynchronousObservable) { - ScalarSynchronousObservable t2 = (ScalarSynchronousObservable)t; - handleScalarSynchronousObservable(t2); - } else { - if (t == null || isUnsubscribed()) { + public void request(long n) { + if (n > 0) { + if (get() == Long.MAX_VALUE) { return; } + BackpressureUtils.getAndAddRequest(this, n); + subscriber.emit(); + } else + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + } + public long produced(int n) { + return addAndGet(-n); + } + } + + /** + * The subscriber that observes Observables. + * @param the value type + */ + static final class MergeSubscriber extends Subscriber> { + final Subscriber child; + final boolean delayErrors; + final int maxConcurrent; + + MergeProducer producer; + + volatile RxRingBuffer queue; + + /** Tracks the active subscriptions to sources. */ + volatile CompositeSubscription subscriptions; + /** Due to the emission loop, we need to store errors somewhere if !delayErrors. */ + volatile ConcurrentLinkedQueue errors; + + final NotificationLite nl; + + volatile boolean done; + + /** Guarded by this. */ + boolean emitting; + /** Guarded by this. */ + boolean missed; + + final Object innerGuard; + /** Copy-on-write array, guarded by innerGuard. */ + volatile InnerSubscriber[] innerSubscribers; + + /** Used to generate unique InnerSubscriber IDs. Modified from onNext only. */ + long uniqueId; + + /** Which was the last InnerSubscriber that emitted? Accessed if emitting == true. */ + long lastId; + /** What was its index in the innerSubscribers array? Accessed if emitting == true. */ + int lastIndex; + + /** An empty array to avoid creating new empty arrays in removeInner. */ + static final InnerSubscriber[] EMPTY = new InnerSubscriber[0]; + + public MergeSubscriber(Subscriber child, boolean delayErrors, int maxConcurrent) { + this.child = child; + this.delayErrors = delayErrors; + this.maxConcurrent = maxConcurrent; + this.nl = NotificationLite.instance(); + this.innerGuard = new Object(); + this.innerSubscribers = EMPTY; + long r = Math.min(maxConcurrent, RxRingBuffer.SIZE); + request(r); + } + + Queue getOrCreateErrorQueue() { + ConcurrentLinkedQueue q = errors; + if (q == null) { synchronized (this) { - // synchronized here because `wip` can be concurrently changed by children Observables - wip++; + q = errors; + if (q == null) { + q = new ConcurrentLinkedQueue(); + errors = q; + } } - handleNewSource(t); } + return q; } - - private void handleNewSource(Observable t) { - if (childrenSubscribers == null) { - // lazily create this only if we receive Observables we need to subscribe to - childrenSubscribers = new SubscriptionIndexedRingBuffer>(); - add(childrenSubscribers); + CompositeSubscription getOrCreateComposite() { + CompositeSubscription c = subscriptions; + if (c == null) { + boolean shouldAdd = false; + synchronized (this) { + c = subscriptions; + if (c == null) { + c = new CompositeSubscription(); + subscriptions = c; + shouldAdd = true; + } + } + if (shouldAdd) { + add(c); + } } - MergeProducer producerIfNeeded = null; - // if we have received a request then we need to respect it, otherwise we fast-path - if (mergeProducer.requested != Long.MAX_VALUE) { - /** - *
 {@code
-                 * With this optimization:
-                 * 
-                 * r.o.OperatorMergePerf.merge1SyncStreamOfN      1000  thrpt         5    57100.080     4686.331    ops/s
-                 * r.o.OperatorMergePerf.merge1SyncStreamOfN   1000000  thrpt         5       60.875        1.622    ops/s
-                 *  
-                 * Without this optimization:
-                 * 
-                 * r.o.OperatorMergePerf.merge1SyncStreamOfN      1000  thrpt         5    29863.945     1858.002    ops/s
-                 * r.o.OperatorMergePerf.merge1SyncStreamOfN   1000000  thrpt         5       30.516        1.087    ops/s
-                 * } 
- */ - producerIfNeeded = mergeProducer; + return c; + } + + @Override + public void onNext(Observable t) { + if (t == null) { + return; } - InnerSubscriber i = new InnerSubscriber(this, producerIfNeeded); - i.sindex = childrenSubscribers.add(i); - t.unsafeSubscribe(i); - if (!isUnsubscribed()) { - request(1); + if (t instanceof ScalarSynchronousObservable) { + tryEmit(((ScalarSynchronousObservable)t).get()); + } else { + InnerSubscriber inner = new InnerSubscriber(this, uniqueId++); + addInner(inner); + t.unsafeSubscribe(inner); + emit(); } } - - private void handleScalarSynchronousObservable(ScalarSynchronousObservable t) { - // fast-path for scalar, synchronous values such as Observable.from(int) - /** - * Without this optimization: - * - *
 {@code
-             * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
-             * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  2,418,452.409   130572.665    ops/s
-             * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5     5,690.456       94.958    ops/s
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5          takes too long
-             * 
-             * With this optimization:
-             * 
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5,475,300.198   156741.334    ops/s
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    68,932.278     1311.023    ops/s
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       64.405        0.611    ops/s
-             * } 
- * - */ - if (mergeProducer.requested == Long.MAX_VALUE) { - handleScalarSynchronousObservableWithoutRequestLimits(t); + + private void reportError() { + List list = new ArrayList(errors); + if (list.size() == 1) { + child.onError(list.get(0)); } else { - handleScalarSynchronousObservableWithRequestLimits(t); + child.onError(new CompositeException(list)); } } - - private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable t) { - T value = t.get(); - if (getEmitLock()) { - boolean moreToDrain; - try { - actual.onNext(value); - } finally { - moreToDrain = releaseEmitLock(); - } - if (moreToDrain) { - drainQueuesIfNeeded(); - } - request(1); - return; - } else { - try { - getOrCreateScalarValueQueue().onNext(value); - } catch (MissingBackpressureException e) { - onError(e); - } - return; + + @Override + public void onError(Throwable e) { + getOrCreateErrorQueue().offer(e); + done = true; + emit(); + } + @Override + public void onCompleted() { + done = true; + emit(); + } + + void addInner(InnerSubscriber inner) { + getOrCreateComposite().add(inner); + synchronized (innerGuard) { + InnerSubscriber[] a = innerSubscribers; + int n = a.length; + InnerSubscriber[] b = new InnerSubscriber[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = inner; + innerSubscribers = b; } } - - private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable t) { - if (getEmitLock()) { - boolean emitted = false; - boolean moreToDrain; - boolean isReturn = false; - try { - long r = mergeProducer.requested; - if (r > 0) { - emitted = true; - actual.onNext(t.get()); - MergeProducer.REQUESTED.decrementAndGet(mergeProducer); - // we handle this Observable without ever incrementing the wip or touching other machinery so just return here - isReturn = true; + void removeInner(InnerSubscriber inner) { + RxRingBuffer q = inner.queue; + if (q != null) { + q.release(); + } + // subscription is non-null here because the very first addInner will create it before + // this can be called + subscriptions.remove(inner); + synchronized (innerGuard) { + InnerSubscriber[] a = innerSubscribers; + int n = a.length; + int j = -1; + // locate the inner + for (int i = 0; i < n; i++) { + if (inner.equals(a[i])) { + j = i; + break; } - } finally { - moreToDrain = releaseEmitLock(); } - if (moreToDrain) { - drainQueuesIfNeeded(); - } - if (emitted) { - request(1); + if (j < 0) { + return; } - if (isReturn) { + if (n == 1) { + innerSubscribers = EMPTY; return; } - } - - // if we didn't return above we need to enqueue - // enqueue the values for later delivery - try { - getOrCreateScalarValueQueue().onNext(t.get()); - } catch (MissingBackpressureException e) { - onError(e); + InnerSubscriber[] b = new InnerSubscriber[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + innerSubscribers = b; } } - - private RxRingBuffer getOrCreateScalarValueQueue() { - RxRingBuffer svq = scalarValueQueue; - if (svq == null) { - svq = RxRingBuffer.getSpscInstance(); - scalarValueQueue = svq; + + /** + * Tries to emit the value directly to the child if + * no concurrent emission is happening at the moment. + *

+ * Since the scalar-value queue optimization applies + * to both the main source and the inner subscribers, + * we handle things in a shared manner. + * + * @param subscriber + * @param value + */ + void tryEmit(InnerSubscriber subscriber, T value) { + boolean success = false; + long r = producer.get(); + if (r != 0L) { + synchronized (this) { + // if nobody is emitting and child has available requests + if (!emitting) { + emitting = true; + success = true; + } + } } - return svq; - } - - private synchronized boolean releaseEmitLock() { - emitLock = false; - if (missedEmitting == 0) { - return false; + if (success) { + emitScalar(subscriber, value, r); } else { - return true; + queueScalar(subscriber, value); } } - private synchronized boolean getEmitLock() { - if (emitLock) { - missedEmitting++; - return false; - } else { - emitLock = true; - missedEmitting = 0; - return true; + protected void queueScalar(InnerSubscriber subscriber, T value) { + /* + * If the attempt to make a fast-path emission failed + * due to lack of requests or an ongoing emission, + * enqueue the value and try the slow emission path. + */ + RxRingBuffer q = subscriber.queue; + if (q == null) { + q = RxRingBuffer.getSpscInstance(); + subscriber.add(q); + subscriber.queue = q; + } + try { + q.onNext(nl.next(value)); + } catch (MissingBackpressureException ex) { + subscriber.unsubscribe(); + subscriber.onError(ex); + return; + } catch (IllegalStateException ex) { + if (!subscriber.isUnsubscribed()) { + subscriber.unsubscribe(); + subscriber.onError(ex); + } + return; } + emit(); } - private boolean drainQueuesIfNeeded() { - while (true) { - if (getEmitLock()) { - int emitted = 0; - boolean moreToDrain; - try { - emitted = drainScalarValueQueue(); - drainChildrenQueues(); - } finally { - moreToDrain = releaseEmitLock(); + protected void emitScalar(InnerSubscriber subscriber, T value, long r) { + boolean skipFinal = false; + try { + try { + child.onNext(value); + } catch (Throwable t) { + if (!delayErrors) { + Exceptions.throwIfFatal(t); + skipFinal = true; + subscriber.unsubscribe(); + subscriber.onError(t); + return; } - // request outside of lock - if (emitted > 0) { - request(emitted); + getOrCreateErrorQueue().offer(t); + } + if (r != Long.MAX_VALUE) { + producer.produced(1); + } + subscriber.requestMore(1); + // check if some state changed while emitting + synchronized (this) { + skipFinal = true; + if (!missed) { + emitting = false; + return; } - if (!moreToDrain) { - return true; + missed = false; + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; } - // otherwise we'll loop and get whatever was added - } else { - return false; } } + /* + * In the synchronized block below request(1) we check + * if there was a concurrent emission attempt and if there was, + * we stay in emission mode and enter the emission loop + * which will take care all the queued up state and + * emission possibilities. + */ + emitLoop(); } - int lastDrainedIndex = 0; - - /** - * ONLY call when holding the EmitLock. - */ - private void drainChildrenQueues() { - if (childrenSubscribers != null) { - lastDrainedIndex = childrenSubscribers.forEach(DRAIN_ACTION, lastDrainedIndex); - } + public void requestMore(long n) { + request(n); } - + /** - * ONLY call when holding the EmitLock. + * Tries to emit the value directly to the child if + * no concurrent emission is happening at the moment. + *

+ * Since the scalar-value queue optimization applies + * to both the main source and the inner subscribers, + * we handle things in a shared manner. + * + * @param subscriber + * @param value */ - private int drainScalarValueQueue() { - RxRingBuffer svq = scalarValueQueue; - if (svq != null) { - long r = mergeProducer.requested; - int emittedWhileDraining = 0; - if (r < 0) { - // drain it all - Object o = null; - while ((o = svq.poll()) != null) { - on.accept(actual, o); - emittedWhileDraining++; - } - } else if (r > 0) { - // drain what was requested - long toEmit = r; - for (int i = 0; i < toEmit; i++) { - Object o = svq.poll(); - if (o == null) { - break; - } else { - on.accept(actual, o); - emittedWhileDraining++; - } + void tryEmit(T value) { + boolean success = false; + long r = producer.get(); + if (r != 0L) { + synchronized (this) { + // if nobody is emitting and child has available requests + if (!emitting) { + emitting = true; + success = true; } - // decrement the number we emitted from outstanding requests - MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining); } - return emittedWhileDraining; } - return 0; + if (success) { + emitScalar(value, r); + } else { + queueScalar(value); + } } - final Func1, Boolean> DRAIN_ACTION = new Func1, Boolean>() { - - @Override - public Boolean call(InnerSubscriber s) { - if (s.q != null) { - long r = mergeProducer.requested; - int emitted = s.drainQueue(); - if (emitted > 0) { - s.requestMore(emitted); - } - if (emitted == r) { - // we emitted as many as were requested so stop the forEach loop - return Boolean.FALSE; - } - } - return Boolean.TRUE; + protected void queueScalar(T value) { + /* + * If the attempt to make a fast-path emission failed + * due to lack of requests or an ongoing emission, + * enqueue the value and try the slow emission path. + */ + RxRingBuffer q = this.queue; + if (q == null) { + q = RxRingBuffer.getSpscInstance(); + this.add(q); + this.queue = q; } - - }; - - @Override - public void onError(Throwable e) { - if (!completed) { - completed = true; - innerError(e, true); + try { + q.onNext(nl.next(value)); + } catch (MissingBackpressureException ex) { + this.unsubscribe(); + this.onError(ex); + return; + } catch (IllegalStateException ex) { + if (!this.isUnsubscribed()) { + this.unsubscribe(); + this.onError(ex); + } + return; } + emit(); } - - private void innerError(Throwable e, boolean parent) { - if (delayErrors) { - synchronized (this) { - if (exceptions == null) { - exceptions = new ConcurrentLinkedQueue(); + + protected void emitScalar(T value, long r) { + boolean skipFinal = false; + try { + try { + child.onNext(value); + } catch (Throwable t) { + if (!delayErrors) { + Exceptions.throwIfFatal(t); + skipFinal = true; + this.unsubscribe(); + this.onError(t); + return; } + getOrCreateErrorQueue().offer(t); } - exceptions.add(e); - boolean sendOnComplete = false; + if (r != Long.MAX_VALUE) { + producer.produced(1); + } + this.requestMore(1); + // check if some state changed while emitting synchronized (this) { - if (!parent) { - wip--; - } - if ((wip == 0 && completed) || (wip < 0)) { - sendOnComplete = true; + skipFinal = true; + if (!missed) { + emitting = false; + return; } + missed = false; } - if (sendOnComplete) { - drainAndComplete(); - } - } else { - actual.onError(e); - } - } - - @Override - public void onCompleted() { - boolean c = false; - synchronized (this) { - completed = true; - if (wip == 0) { - c = true; + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } } } - if (c) { - // complete outside of lock - drainAndComplete(); - } + /* + * In the synchronized block below request(1) we check + * if there was a concurrent emission attempt and if there was, + * we stay in emission mode and enter the emission loop + * which will take care all the queued up state and + * emission possibilities. + */ + emitLoop(); } - - void completeInner(InnerSubscriber s) { - boolean sendOnComplete = false; + + void emit() { synchronized (this) { - wip--; - if (wip == 0 && completed) { - sendOnComplete = true; + if (emitting) { + missed = true; + return; } + emitting = true; } - childrenSubscribers.remove(s.sindex); - if (sendOnComplete) { - drainAndComplete(); - } + emitLoop(); } - - private void drainAndComplete() { - boolean moreToDrain = true; - while (moreToDrain) { - synchronized (this) { - missedEmitting = 0; - } - drainScalarValueQueue(); - drainChildrenQueues(); - synchronized (this) { - moreToDrain = missedEmitting > 0; - } - } - RxRingBuffer svq = scalarValueQueue; - if (svq == null || svq.isEmpty()) { - if (delayErrors) { - Queue es = null; - synchronized (this) { - es = exceptions; + /** + * The standard emission loop serializing events and requests. + */ + void emitLoop() { + boolean skipFinal = false; + try { + final Subscriber child = this.child; + for (;;) { + // eagerly check if child unsubscribed or we reached a terminal state. + if (checkTerminate()) { + skipFinal = true; + return; } - if (es != null) { - if (es.isEmpty()) { - actual.onCompleted(); - } else if (es.size() == 1) { - actual.onError(es.poll()); + RxRingBuffer svq = queue; + + long r = producer.get(); + boolean unbounded = r == Long.MAX_VALUE; + + // count the number of 'completed' sources to replenish them in batches + int replenishMain = 0; + + // try emitting as many scalars as possible + if (svq != null) { + for (;;) { + int scalarEmission = 0; + Object o = null; + while (r > 0) { + o = svq.poll(); + // eagerly check if child unsubscribed or we reached a terminal state. + if (checkTerminate()) { + skipFinal = true; + return; + } + if (o == null) { + break; + } + T v = nl.getValue(o); + // if child throws, report bounce it back immediately + try { + child.onNext(v); + } catch (Throwable t) { + if (!delayErrors) { + Exceptions.throwIfFatal(t); + skipFinal = true; + unsubscribe(); + child.onError(t); + return; + } + getOrCreateErrorQueue().offer(t); + } + replenishMain++; + scalarEmission++; + r--; + } + if (scalarEmission > 0) { + if (unbounded) { + r = Long.MAX_VALUE; + } else { + r = producer.produced(scalarEmission); + } + } + if (r == 0L || o == null) { + break; + } + } + } + + /* + * We need to read done before innerSubscribers because innerSubcribers are added + * before done is set to true. If it were the other way around, we could read an empty + * innerSubscribers, get paused and then read a done flag but an async producer + * might have added more subscribers between the two. + */ + boolean d = done; + // re-read svq because it could have been created + // asynchronously just before done was set to true. + svq = queue; + // read the current set of inner subscribers + InnerSubscriber[] inner = innerSubscribers; + int n = inner.length; + + // check if upstream is done, there are no scalar values + // and no active inner subscriptions + if (d && (svq == null || svq.isEmpty()) && n == 0) { + Queue e = errors; + if (e == null || e.isEmpty()) { + child.onCompleted(); } else { - actual.onError(new CompositeException(es)); + reportError(); + } + if (svq != null) { + svq.release(); } - } else { - actual.onCompleted(); + skipFinal = true; + return; + } + + boolean innerCompleted = false; + if (n > 0) { + // let's continue the round-robin emission from last location + long startId = lastId; + int index = lastIndex; + + // in case there were changes in the array or the index + // no longer points to the inner with the cached id + if (n <= index || inner[index].id != startId) { + if (n <= index) { + index = 0; + } + // try locating the inner with the cached index + int j = index; + for (int i = 0; i < n; i++) { + if (inner[j].id == startId) { + break; + } + // wrap around in round-robin fashion + j++; + if (j == n) { + j = 0; + } + } + // if we found it again, j will point to it + // otherwise, we continue with the replacement at j + index = j; + lastIndex = j; + lastId = inner[j].id; + } + + int j = index; + // loop through all sources once to avoid delaying any new sources too much + for (int i = 0; i < n; i++) { + // eagerly check if child unsubscribed or we reached a terminal state. + if (checkTerminate()) { + skipFinal = true; + return; + } + @SuppressWarnings("unchecked") + InnerSubscriber is = (InnerSubscriber)inner[j]; + + Object o = null; + for (;;) { + int produced = 0; + while (r > 0) { + // eagerly check if child unsubscribed or we reached a terminal state. + if (checkTerminate()) { + skipFinal = true; + return; + } + RxRingBuffer q = is.queue; + if (q == null) { + break; + } + o = q.poll(); + if (o == null) { + break; + } + T v = nl.getValue(o); + // if child throws, report bounce it back immediately + try { + child.onNext(v); + } catch (Throwable t) { + skipFinal = true; + Exceptions.throwIfFatal(t); + try { + child.onError(t); + } finally { + unsubscribe(); + } + return; + } + r--; + produced++; + } + if (produced > 0) { + if (!unbounded) { + r = producer.produced(produced); + } else { + r = Long.MAX_VALUE; + } + is.requestMore(produced); + } + // if we run out of requests or queued values, break + if (r == 0 || o == null) { + break; + } + } + boolean innerDone = is.done; + RxRingBuffer innerQueue = is.queue; + if (innerDone && (innerQueue == null || innerQueue.isEmpty())) { + removeInner(is); + if (checkTerminate()) { + skipFinal = true; + return; + } + replenishMain++; + innerCompleted = true; + } + // if we run out of requests, don't try the other sources + if (r == 0) { + break; + } + + // wrap around in round-robin fashion + j++; + if (j == n) { + j = 0; + } + } + // if we run out of requests or just completed a round, save the index and id + lastIndex = j; + lastId = inner[j].id; + } + + if (replenishMain > 0) { + request(replenishMain); + } + // if one or more inner completed, loop again to see if we can terminate the whole stream + if (innerCompleted) { + continue; + } + // in case there were updates to the state, we loop again + synchronized (this) { + if (!missed) { + skipFinal = true; + emitting = false; + break; + } + missed = false; + } + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; } - } else { - actual.onCompleted(); } } } - - } - - private static final class MergeProducer implements Producer { - - private final MergeSubscriber ms; - - public MergeProducer(MergeSubscriber ms) { - this.ms = ms; - } - - private volatile long requested = 0; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested"); - - @Override - public void request(long n) { - if (requested == Long.MAX_VALUE) { - return; + + /** + * Check if the operator reached some terminal state: child unsubscribed, + * an error was reported and we don't delay errors. + * @return true if the child unsubscribed or there are errors available and merge doesn't delay errors. + */ + boolean checkTerminate() { + if (child.isUnsubscribed()) { + return true; } - if (n == Long.MAX_VALUE) { - requested = Long.MAX_VALUE; - } else { - BackpressureUtils.getAndAddRequest(REQUESTED, this, n); - if (ms.drainQueuesIfNeeded()) { - boolean sendComplete = false; - synchronized (ms) { - if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) { - sendComplete = true; - } - } - if (sendComplete) { - ms.drainAndComplete(); - } + Queue e = errors; + if (!delayErrors && (e != null && !e.isEmpty())) { + try { + reportError(); + } finally { + unsubscribe(); } + return true; } + return false; } - } - - private static final class InnerSubscriber extends Subscriber { - public int sindex; - final MergeSubscriber parentSubscriber; - final MergeProducer producer; - /** Make sure the inner termination events are delivered only once. */ - @SuppressWarnings("unused") - volatile int terminated; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated"); - - private final RxRingBuffer q = RxRingBuffer.getSpscInstance(); - - public InnerSubscriber(MergeSubscriber parent, MergeProducer producer) { - this.parentSubscriber = parent; - this.producer = producer; - add(q); - request(q.capacity()); + static final class InnerSubscriber extends Subscriber { + final MergeSubscriber parent; + final long id; + volatile boolean done; + volatile RxRingBuffer queue; + int outstanding; + static final int limit = RxRingBuffer.SIZE / 4; + + public InnerSubscriber(MergeSubscriber parent, long id) { + this.parent = parent; + this.id = id; + } + @Override + public void onStart() { + outstanding = RxRingBuffer.SIZE; + request(RxRingBuffer.SIZE); } - @Override public void onNext(T t) { - emit(t, false); + parent.tryEmit(this, t); } - @Override public void onError(Throwable e) { - // it doesn't go through queues, it immediately onErrors and tears everything down - if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) { - parentSubscriber.innerError(e, false); - } + done = true; + parent.getOrCreateErrorQueue().offer(e); + parent.emit(); } - @Override public void onCompleted() { - if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) { - emit(null, true); - } + done = true; + parent.emit(); } - public void requestMore(long n) { - request(n); - } - - private void emit(T t, boolean complete) { - boolean drain = false; - boolean enqueue = true; - /** - * This optimization to skip the queue is messy ... but it makes a big difference in performance when merging a single stream - * with many values, or many intermittent streams without contention. It doesn't make much of a difference if there is contention. - * - * Below are some of the relevant benchmarks to show the difference. - * - *

 {@code
-             * With this fast-path:
-             * 
-             * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5344143.680   393484.592    ops/s
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    83582.662     4293.755    ops/s +++
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       73.889        4.477    ops/s +++
-             * 
-             * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5799265.333   199205.296    ops/s +
-             * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       62.655        2.521    ops/s +++
-             * 
-             * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76925.616     4909.174    ops/s
-             * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3634.977      242.469    ops/s
-             * 
-             * Without:
-             * 
-             * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5099295.678   159539.842    ops/s
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    18196.671    10053.298    ops/s
-             * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       19.184        1.028    ops/s
-             * 
-             * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5591612.719   591821.763    ops/s
-             * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       21.018        3.251    ops/s
-             * 
-             * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    72692.073    18395.031    ops/s
-             * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     4379.093      386.368    ops/s
-             * } 
- * - * It looks like it may cause a slowdown in highly contended cases (like 'mergeTwoAsyncStreamsOfN' above) as instead of just - * putting in the queue, it attempts to get the lock. We are optimizing for the non-contended case. - */ - if (parentSubscriber.getEmitLock()) { - long emitted = 0; - enqueue = false; - try { - // drain the queue if there is anything in it before emitting the current value - emitted += drainQueue(); - // } - if (producer == null) { - // no backpressure requested - if (complete) { - parentSubscriber.completeInner(this); - } else { - try { - parentSubscriber.actual.onNext(t); - } catch (Throwable e) { - // special error handling due to complexity of merge - onError(OnErrorThrowable.addValueAsLastCause(e, t)); - } - emitted++; - } - } else { - // this needs to check q.count() as draining above may not have drained the full queue - // perf tests show this to be okay, though different queue implementations could perform poorly with this - if (producer.requested > 0 && q.count() == 0) { - if (complete) { - parentSubscriber.completeInner(this); - } else { - try { - parentSubscriber.actual.onNext(t); - } catch (Throwable e) { - // special error handling due to complexity of merge - onError(OnErrorThrowable.addValueAsLastCause(e, t)); - } - emitted++; - MergeProducer.REQUESTED.decrementAndGet(producer); - } - } else { - // no requests available, so enqueue it - enqueue = true; - } - } - } finally { - drain = parentSubscriber.releaseEmitLock(); - } - // request upstream what we just emitted - if(emitted > 0) { - request(emitted); - } - } - if (enqueue) { - enqueue(t, complete); - drain = true; - } - if (drain) { - /** - * This extra check for whether to call drain is ugly, but it helps: - *
 {@code
-                 * Without:
-                 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN     1000  thrpt         5       61.812        1.455    ops/s
-                 * 
-                 * With:
-                 * r.o.OperatorMergePerf.mergeNSyncStreamsOfN     1000  thrpt         5       78.795        1.766    ops/s
-                 * } 
- */ - parentSubscriber.drainQueuesIfNeeded(); - } - } - - private void enqueue(T t, boolean complete) { - try { - if (complete) { - q.onCompleted(); - } else { - q.onNext(t); - } - } catch (MissingBackpressureException e) { - onError(e); - } - } - - private int drainRequested() { - int emitted = 0; - // drain what was requested - long toEmit = producer.requested; - Object o; - for (int i = 0; i < toEmit; i++) { - o = q.poll(); - if (o == null) { - // no more items - break; - } else if (q.isCompleted(o)) { - parentSubscriber.completeInner(this); - } else { - try { - if (!q.accept(o, parentSubscriber.actual)) { - emitted++; - } - } catch (Throwable e) { - // special error handling due to complexity of merge - onError(OnErrorThrowable.addValueAsLastCause(e, o)); - } - } - } - - // decrement the number we emitted from outstanding requests - MergeProducer.REQUESTED.getAndAdd(producer, -emitted); - return emitted; - } - - private int drainAll() { - int emitted = 0; - // drain it all - Object o; - while ((o = q.poll()) != null) { - if (q.isCompleted(o)) { - parentSubscriber.completeInner(this); - } else { - try { - if (!q.accept(o, parentSubscriber.actual)) { - emitted++; - } - } catch (Throwable e) { - // special error handling due to complexity of merge - onError(OnErrorThrowable.addValueAsLastCause(e, o)); - } - } + int r = outstanding - (int)n; + if (r > limit) { + outstanding = r; + return; } - return emitted; - } - - private int drainQueue() { - if (producer != null) { - return drainRequested(); - } else { - return drainAll(); + outstanding = RxRingBuffer.SIZE; + int k = RxRingBuffer.SIZE - r; + if (k > 0) { + request(k); } } - } -} \ No newline at end of file + }} \ No newline at end of file diff --git a/src/main/java/rx/internal/operators/OperatorMergeMaxConcurrent.java b/src/main/java/rx/internal/operators/OperatorMergeMaxConcurrent.java deleted file mode 100644 index 9f28f3199e..0000000000 --- a/src/main/java/rx/internal/operators/OperatorMergeMaxConcurrent.java +++ /dev/null @@ -1,350 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.operators; - -import java.util.*; -import java.util.concurrent.atomic.*; - -import rx.*; -import rx.Observable.Operator; -import rx.Observable; -import rx.exceptions.MissingBackpressureException; -import rx.internal.util.RxRingBuffer; -import rx.observers.SerializedSubscriber; -import rx.subscriptions.CompositeSubscription; - -/** - * Flattens a list of Observables into one Observable sequence, without any transformation. - *

- * - *

- * You can combine the items emitted by multiple Observables so that they act like a single - * Observable, by using the merge operation. - * - * @param the emitted value type - */ -public final class OperatorMergeMaxConcurrent implements Operator> { - final int maxConcurrency; - - public OperatorMergeMaxConcurrent(int maxConcurrency) { - this.maxConcurrency = maxConcurrency; - } - - @Override - public Subscriber> call(Subscriber child) { - final SerializedSubscriber s = new SerializedSubscriber(child); - final CompositeSubscription csub = new CompositeSubscription(); - child.add(csub); - - SourceSubscriber ssub = new SourceSubscriber(maxConcurrency, s, csub); - child.setProducer(new MergeMaxConcurrentProducer(ssub)); - - return ssub; - } - /** Routes the requests from downstream to the sourcesubscriber. */ - static final class MergeMaxConcurrentProducer implements Producer { - final SourceSubscriber ssub; - public MergeMaxConcurrentProducer(SourceSubscriber ssub) { - this.ssub = ssub; - } - @Override - public void request(long n) { - ssub.downstreamRequest(n); - } - } - static final class SourceSubscriber extends Subscriber> { - final NotificationLite nl = NotificationLite.instance(); - final int maxConcurrency; - final Subscriber s; - final CompositeSubscription csub; - final Object guard; - - volatile int wip; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP - = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "wip"); - volatile int sourceIndex; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater SOURCE_INDEX - = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "sourceIndex"); - - /** Guarded by guard. */ - int active; - /** Guarded by guard. */ - final Queue> queue; - - /** Indicates the emitting phase. Guarded by this. */ - boolean emitting; - /** Counts the missed emitting calls. Guarded by this. */ - int missedEmitting; - /** The last buffer index in the round-robin drain scheme. Accessed while emitting == true. */ - int lastIndex; - - /** Guarded by itself. */ - final List subscribers; - - volatile long requested; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED - = AtomicLongFieldUpdater.newUpdater(SourceSubscriber.class, "requested"); - - - public SourceSubscriber(int maxConcurrency, Subscriber s, CompositeSubscription csub) { - super(s); - this.maxConcurrency = maxConcurrency; - this.s = s; - this.csub = csub; - this.guard = new Object(); - this.queue = new ArrayDeque>(maxConcurrency); - this.subscribers = Collections.synchronizedList(new ArrayList()); - this.wip = 1; - } - - @Override - public void onStart() { - request(maxConcurrency); - } - - @Override - public void onNext(Observable t) { - synchronized (guard) { - queue.add(t); - } - subscribeNext(); - } - - void subscribeNext() { - Observable t; - synchronized (guard) { - t = queue.peek(); - if (t == null || active >= maxConcurrency) { - return; - } - active++; - queue.poll(); - } - - MergeItemSubscriber itemSub = new MergeItemSubscriber(SOURCE_INDEX.getAndIncrement(this)); - subscribers.add(itemSub); - - csub.add(itemSub); - - WIP.incrementAndGet(this); - - t.unsafeSubscribe(itemSub); - - request(1); - } - - @Override - public void onError(Throwable e) { - Object[] active; - synchronized (subscribers) { - active = subscribers.toArray(); - subscribers.clear(); - } - - try { - s.onError(e); - - unsubscribe(); - } finally { - for (Object o : active) { - @SuppressWarnings("unchecked") - MergeItemSubscriber a = (MergeItemSubscriber)o; - a.release(); - } - } - - } - - @Override - public void onCompleted() { - WIP.decrementAndGet(this); - drain(); - } - - protected void downstreamRequest(long n) { - for (;;) { - long r = requested; - long u; - if (r != Long.MAX_VALUE && n == Long.MAX_VALUE) { - u = Long.MAX_VALUE; - } else - if (r + n < 0) { - u = Long.MAX_VALUE; - } else { - u = r + n; - } - if (REQUESTED.compareAndSet(this, r, u)) { - break; - } - } - drain(); - } - - protected void drain() { - synchronized (this) { - if (emitting) { - missedEmitting++; - return; - } - emitting = true; - missedEmitting = 0; - } - final List.MergeItemSubscriber> subs = subscribers; - final Subscriber child = s; - Object[] active = new Object[subs.size()]; - do { - long r; - - outer: - while ((r = requested) > 0) { - int idx = lastIndex; - synchronized (subs) { - if (subs.size() == active.length) { - active = subs.toArray(active); - } else { - active = subs.toArray(); - } - } - - int resumeIndex = 0; - int j = 0; - for (Object o : active) { - @SuppressWarnings("unchecked") - MergeItemSubscriber e = (MergeItemSubscriber)o; - if (e.index == idx) { - resumeIndex = j; - break; - } - j++; - } - int sumConsumed = 0; - for (int i = 0; i < active.length; i++) { - j = (i + resumeIndex) % active.length; - - @SuppressWarnings("unchecked") - final MergeItemSubscriber e = (MergeItemSubscriber)active[j]; - final RxRingBuffer b = e.buffer; - lastIndex = e.index; - - if (!e.once && b.peek() == null) { - subs.remove(e); - - synchronized (guard) { - this.active--; - } - csub.remove(e); - - e.release(); - - subscribeNext(); - - WIP.decrementAndGet(this); - - continue outer; - } - - int consumed = 0; - Object v; - while (r > 0 && (v = b.poll()) != null) { - nl.accept(child, v); - if (child.isUnsubscribed()) { - return; - } - r--; - consumed++; - } - if (consumed > 0) { - sumConsumed += consumed; - REQUESTED.addAndGet(this, -consumed); - e.requestMore(consumed); - } - if (r == 0) { - break outer; - } - } - if (sumConsumed == 0) { - break; - } - } - - if (active.length == 0) { - if (wip == 0) { - child.onCompleted(); - return; - } - } - synchronized (this) { - if (missedEmitting == 0) { - emitting = false; - break; - } - missedEmitting = 0; - } - } while (true); - } - final class MergeItemSubscriber extends Subscriber { - volatile boolean once = true; - final int index; - final RxRingBuffer buffer; - - public MergeItemSubscriber(int index) { - buffer = RxRingBuffer.getSpmcInstance(); - this.index = index; - } - - @Override - public void onStart() { - request(RxRingBuffer.SIZE); - } - - @Override - public void onNext(T t) { - try { - buffer.onNext(t); - } catch (MissingBackpressureException ex) { - onError(ex); - return; - } - - drain(); - } - - @Override - public void onError(Throwable e) { - SourceSubscriber.this.onError(e); - } - - @Override - public void onCompleted() { - if (once) { - once = false; - drain(); - } - } - /** Request more from upstream. */ - void requestMore(long n) { - request(n); - } - void release() { - // NO-OP for now - buffer.release(); - } - } - } -} diff --git a/src/main/java/rx/internal/operators/OperatorPublish.java b/src/main/java/rx/internal/operators/OperatorPublish.java index c6739927ee..492cd8f261 100644 --- a/src/main/java/rx/internal/operators/OperatorPublish.java +++ b/src/main/java/rx/internal/operators/OperatorPublish.java @@ -759,4 +759,4 @@ public void unsubscribe() { } } } -} \ No newline at end of file +} diff --git a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java index c350c895c4..145a67096e 100644 --- a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java +++ b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java @@ -15,9 +15,12 @@ */ package rx.internal.util; -import rx.*; +import rx.Observable; +import rx.Scheduler; import rx.Scheduler.Worker; +import rx.Subscriber; import rx.functions.Action0; +import rx.functions.Func1; import rx.internal.schedulers.EventLoopsScheduler; public final class ScalarSynchronousObservable extends Observable { @@ -117,4 +120,32 @@ public void call() { subscriber.onCompleted(); } } + + public Observable scalarFlatMap(final Func1> func) { + return create(new OnSubscribe() { + @Override + public void call(final Subscriber child) { + Observable o = func.call(t); + if (o.getClass() == ScalarSynchronousObservable.class) { + child.onNext(((ScalarSynchronousObservable)o).t); + child.onCompleted(); + } else { + o.unsafeSubscribe(new Subscriber(child) { + @Override + public void onNext(R v) { + child.onNext(v); + } + @Override + public void onError(Throwable e) { + child.onError(e); + } + @Override + public void onCompleted() { + child.onCompleted(); + } + }); + } + } + }); + } } diff --git a/src/perf/java/rx/operators/OperatorFlatMapPerf.java b/src/perf/java/rx/operators/OperatorFlatMapPerf.java index c20cff67f1..2913911a0f 100644 --- a/src/perf/java/rx/operators/OperatorFlatMapPerf.java +++ b/src/perf/java/rx/operators/OperatorFlatMapPerf.java @@ -17,8 +17,8 @@ import java.util.concurrent.TimeUnit; -import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; diff --git a/src/test/java/rx/internal/operators/OperatorFlatMapTest.java b/src/test/java/rx/internal/operators/OperatorFlatMapTest.java index a4635f1512..bb5127665c 100644 --- a/src/test/java/rx/internal/operators/OperatorFlatMapTest.java +++ b/src/test/java/rx/internal/operators/OperatorFlatMapTest.java @@ -17,6 +17,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +import static org.junit.Assert.*; import java.util.*; import java.util.concurrent.TimeUnit; @@ -384,6 +385,17 @@ public Integer call(Integer t1, Integer t2) { System.out.println("--> testFlatMapSelectorMaxConcurrent: " + ts.getOnNextEvents()); Assert.assertTrue(expected.containsAll(ts.getOnNextEvents())); } + + @Test + public void testFlatMapTransformsMaxConcurrentNormalLoop() { + for (int i = 0; i < 1000; i++) { + if (i % 100 == 0) { + System.out.println("testFlatMapTransformsMaxConcurrentNormalLoop => " + i); + } + testFlatMapTransformsMaxConcurrentNormal(); + } + } + @Test public void testFlatMapTransformsMaxConcurrentNormal() { final int m = 2; @@ -416,4 +428,120 @@ public void testFlatMapTransformsMaxConcurrentNormal() { verify(o, never()).onNext(5); verify(o, never()).onError(any(Throwable.class)); } + + @Ignore // don't care for any reordering + @Test(timeout = 10000) + public void flatMapRangeAsyncLoop() { + for (int i = 0; i < 2000; i++) { + if (i % 10 == 0) { + System.out.println("flatMapRangeAsyncLoop > " + i); + } + TestSubscriber ts = new TestSubscriber(); + Observable.range(0, 1000) + .flatMap(new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.just(t); + } + }) + .observeOn(Schedulers.computation()) + .subscribe(ts); + + ts.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS); + if (ts.getOnCompletedEvents().isEmpty()) { + System.out.println(ts.getOnNextEvents().size()); + } + ts.assertTerminalEvent(); + ts.assertNoErrors(); + List list = ts.getOnNextEvents(); + assertEquals(1000, list.size()); + boolean f = false; + for (int j = 0; j < list.size(); j++) { + if (list.get(j) != j) { + System.out.println(j + " " + list.get(j)); + f = true; + } + } + if (f) { + Assert.fail("Results are out of order!"); + } + } + } + @Test(timeout = 30000) + public void flatMapRangeMixedAsyncLoop() { + for (int i = 0; i < 2000; i++) { + if (i % 10 == 0) { + System.out.println("flatMapRangeAsyncLoop > " + i); + } + TestSubscriber ts = new TestSubscriber(); + Observable.range(0, 1000) + .flatMap(new Func1>() { + final Random rnd = new Random(); + @Override + public Observable call(Integer t) { + Observable r = Observable.just(t); + if (rnd.nextBoolean()) { + r = r.asObservable(); + } + return r; + } + }) + .observeOn(Schedulers.computation()) + .subscribe(ts); + + ts.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS); + if (ts.getOnCompletedEvents().isEmpty()) { + System.out.println(ts.getOnNextEvents().size()); + } + ts.assertTerminalEvent(); + ts.assertNoErrors(); + List list = ts.getOnNextEvents(); + if (list.size() < 1000) { + Set set = new HashSet(list); + for (int j = 0; j < 1000; j++) { + if (!set.contains(j)) { + System.out.println(j + " missing"); + } + } + } + assertEquals(1000, list.size()); + } + } + + @Test + public void flatMapIntPassthruAsync() { + for (int i = 0;i < 1000; i++) { + TestSubscriber ts = new TestSubscriber(); + + Observable.range(1, 1000).flatMap(new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.just(1).subscribeOn(Schedulers.computation()); + } + }).subscribe(ts); + + ts.awaitTerminalEvent(5, TimeUnit.SECONDS); + ts.assertNoErrors(); + ts.assertCompleted(); + ts.assertValueCount(1000); + } + } + @Test + public void flatMapTwoNestedSync() { + for (final int n : new int[] { 1, 1000, 1000000 }) { + TestSubscriber ts = new TestSubscriber(); + + Observable.just(1, 2).flatMap(new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.range(1, n); + } + }).subscribe(ts); + + System.out.println("flatMapTwoNestedSync >> @ " + n); + ts.assertNoErrors(); + ts.assertCompleted(); + ts.assertValueCount(n * 2); + } + } } diff --git a/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java b/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java index 85eb84b6e9..db086d6632 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java @@ -15,6 +15,17 @@ */ package rx.internal.operators; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; @@ -27,19 +38,9 @@ import rx.Subscriber; import rx.exceptions.CompositeException; import rx.exceptions.TestException; +import rx.functions.Action1; import rx.observers.TestSubscriber; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.*; - public class OperatorMergeDelayErrorTest { @Mock @@ -66,7 +67,9 @@ public void testErrorDelayed1() { verify(stringObserver, times(1)).onNext("four"); verify(stringObserver, times(0)).onNext("five"); // despite not expecting it ... we don't do anything to prevent it if the source Observable keeps sending after onError - verify(stringObserver, times(1)).onNext("six"); + // inner observable errors are considered terminal for that source +// verify(stringObserver, times(1)).onNext("six"); + // inner observable errors are considered terminal for that source } @Test @@ -87,7 +90,8 @@ public void testErrorDelayed2() { verify(stringObserver, times(1)).onNext("four"); verify(stringObserver, times(0)).onNext("five"); // despite not expecting it ... we don't do anything to prevent it if the source Observable keeps sending after onError - verify(stringObserver, times(1)).onNext("six"); + // inner observable errors are considered terminal for that source +// verify(stringObserver, times(1)).onNext("six"); verify(stringObserver, times(1)).onNext("seven"); verify(stringObserver, times(1)).onNext("eight"); verify(stringObserver, times(1)).onNext("nine"); @@ -188,7 +192,8 @@ public void testCompositeErrorDelayed1() { verify(stringObserver, times(1)).onNext("four"); verify(stringObserver, times(0)).onNext("five"); // despite not expecting it ... we don't do anything to prevent it if the source Observable keeps sending after onError - verify(stringObserver, times(1)).onNext("six"); + // inner observable errors are considered terminal for that source +// verify(stringObserver, times(1)).onNext("six"); } @Test @@ -287,7 +292,7 @@ public void testMergeArrayWithThreading() { verify(stringObserver, times(1)).onCompleted(); } - @Test(timeout=1000L) + @Test(timeout = 1000L) public void testSynchronousError() { final Observable> o1 = Observable.error(new RuntimeException("unit test")); @@ -472,6 +477,10 @@ public void onCompleted() { }); + /* + * If the child onNext throws, why would we keep accepting values from + * other sources? + */ inOrder.verify(o).onNext(2); inOrder.verify(o, never()).onNext(0); inOrder.verify(o, never()).onNext(1); @@ -546,4 +555,26 @@ public void run() { t.start(); } } + @Test + public void testDelayErrorMaxConcurrent() { + final List requests = new ArrayList(); + Observable source = Observable.mergeDelayError(Observable.just( + Observable.just(1).asObservable(), + Observable.error(new TestException())).doOnRequest(new Action1() { + @Override + public void call(Long t1) { + requests.add(t1); + } + }), 1); + + TestSubscriber ts = new TestSubscriber(); + + source.subscribe(ts); + + ts.assertReceivedOnNext(Arrays.asList(1)); + ts.assertTerminalEvent(); + assertEquals(1, ts.getOnErrorEvents().size()); + assertTrue(ts.getOnErrorEvents().get(0) instanceof TestException); + assertEquals(Arrays.asList(1L, 1L, 1L), requests); + } } \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java b/src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java index 52c7ee21f2..9cd65de8d0 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeMaxConcurrentTest.java @@ -190,7 +190,7 @@ public void testSimpleOneLess() { ts.assertReceivedOnNext(result); } } - @Test(timeout = 10000) + @Test(timeout = 20000) public void testSimpleAsyncLoop() { for (int i = 0; i < 200; i++) { testSimpleAsync(); diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 7d785b4088..9732611e44 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -16,46 +16,26 @@ package rx.internal.operators; import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import rx.Notification; -import rx.Observable; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.junit.*; +import org.mockito.*; + +import rx.*; import rx.Observable.OnSubscribe; -import rx.Observer; -import rx.Scheduler; import rx.Scheduler.Worker; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; +import rx.Observable; +import rx.Observer; +import rx.functions.*; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; -import rx.schedulers.Schedulers; -import rx.schedulers.TestScheduler; +import rx.schedulers.*; import rx.subscriptions.Subscriptions; public class OperatorMergeTest { @@ -494,7 +474,7 @@ public void call() { }); } - @Test(timeout = 10000) + @Test//(timeout = 10000) public void testConcurrency() { Observable o = Observable.range(1, 10000).subscribeOn(Schedulers.newThread()); @@ -503,7 +483,8 @@ public void testConcurrency() { TestSubscriber ts = new TestSubscriber(); merge.subscribe(ts); - ts.awaitTerminalEvent(); + ts.awaitTerminalEvent(3, TimeUnit.SECONDS); + ts.assertTerminalEvent(); ts.assertNoErrors(); assertEquals(1, ts.getOnCompletedEvents().size()); List onNextEvents = ts.getOnNextEvents(); @@ -672,7 +653,7 @@ public void onNext(Integer t) { * * This requires merge to also obey the Product.request values coming from it's child subscriber. */ - @Test + @Test(timeout = 10000) public void testBackpressureDownstreamWithConcurrentStreams() throws InterruptedException { final AtomicInteger generated1 = new AtomicInteger(); Observable o1 = createInfiniteObservable(generated1).subscribeOn(Schedulers.computation()); @@ -1055,8 +1036,9 @@ public void shouldNotCompleteIfThereArePendingScalarSynchronousEmissionsWhenTheL assertEquals(Collections.>emptyList(), subscriber.getOnCompletedEvents()); subscriber.requestMore(1); subscriber.assertReceivedOnNext(asList(1L)); - assertEquals(Collections.>emptyList(), subscriber.getOnCompletedEvents()); - subscriber.requestMore(1); +// TODO: it should be acceptable to get a completion event without requests +// assertEquals(Collections.>emptyList(), subscriber.getOnCompletedEvents()); +// subscriber.requestMore(1); subscriber.assertTerminalEvent(); } @@ -1240,4 +1222,85 @@ public void call(Integer s) { } }; } + + Func1> toScalar = new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.just(t); + } + }; + Func1> toHiddenScalar = new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.just(t).asObservable(); + } + }; + + void runMerge(Func1> func, TestSubscriber ts) { + List list = new ArrayList(); + for (int i = 0; i < 1000; i++) { + list.add(i); + } + Observable source = Observable.from(list); + source.flatMap(func).subscribe(ts); + + if (ts.getOnNextEvents().size() != 1000) { + System.out.println(ts.getOnNextEvents()); + } + + ts.assertTerminalEvent(); + ts.assertNoErrors(); + ts.assertReceivedOnNext(list); + } + + @Test + public void testFastMergeFullScalar() { + runMerge(toScalar, new TestSubscriber()); + } + @Test + public void testFastMergeHiddenScalar() { + runMerge(toHiddenScalar, new TestSubscriber()); + } + @Test + public void testSlowMergeFullScalar() { + for (final int req : new int[] { 16, 32, 64, 128, 256 }) { + TestSubscriber ts = new TestSubscriber() { + int remaining = req; + @Override + public void onStart() { + request(req); + } + @Override + public void onNext(Integer t) { + super.onNext(t); + if (--remaining == 0) { + remaining = req; + request(req); + } + } + }; + runMerge(toScalar, ts); + } + } + @Test + public void testSlowMergeHiddenScalar() { + for (final int req : new int[] { 16, 32, 64, 128, 256 }) { + TestSubscriber ts = new TestSubscriber() { + int remaining = req; + @Override + public void onStart() { + request(req); + } + @Override + public void onNext(Integer t) { + super.onNext(t); + if (--remaining == 0) { + remaining = req; + request(req); + } + } + }; + runMerge(toHiddenScalar, ts); + } + } }