Skip to content

Merge and MergeMaxConcurrent unified and rewritten #2928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,11 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
* {@code source} Observable
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}

Expand Down Expand Up @@ -1721,8 +1725,13 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
* if {@code maxConcurrent} is less than or equal to 0
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@Experimental
@SuppressWarnings({"unchecked", "rawtypes"})
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
return source.lift(new OperatorMergeMaxConcurrent<T>(maxConcurrent));
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false, maxConcurrent));
}

/**
Expand Down Expand Up @@ -1993,7 +2002,31 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}


/**
* Flattens an Array of Observables into one Observable, without any transformation, while limiting the
* number of concurrent subscriptions to these Observables.
* <p>
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.io.png" alt="">
* <p>
* You can combine items emitted by multiple Observables so that they appear as a single Observable, by
* using the {@code merge} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sequences
* the Array of Observables
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits all of the items emitted by the Observables in the Array
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@Experimental
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent) {
return merge(from(sequences), maxConcurrent);
}
/**
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
* receive all successfully emitted items from all of the source Observables without being interrupted by
Expand Down Expand Up @@ -2021,6 +2054,37 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source) {
return source.lift(OperatorMerge.<T>instance(true));
}
/**
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
* receive all successfully emitted items from all of the source Observables without being interrupted by
* an error notification from one of them, while limiting the
* number of concurrent subscriptions to these Observables.
* <p>
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged Observables have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
* <p>
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
* invoke the {@code onError} method of its Observers once.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param source
* an Observable that emits Observables
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits all of the items emitted by the Observables emitted by the
* {@code source} Observable
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@Experimental
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
return source.lift(OperatorMerge.<T>instance(true, maxConcurrent));
}

/**
* Flattens two Observables into one Observable, in a way that allows an Observer to receive all
Expand Down Expand Up @@ -4618,6 +4682,9 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}

Expand Down Expand Up @@ -4646,6 +4713,9 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
*/
@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func), maxConcurrent);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ public void onCompleted() {
child.setProducer(producer);
return s;
}
}
}
177 changes: 168 additions & 9 deletions src/main/java/rx/internal/operators/OperatorMapNotification.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@
*/
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.MissingBackpressureException;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.internal.util.unsafe.SpscArrayQueue;
import rx.internal.util.unsafe.UnsafeAccess;

/**
* Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
Expand All @@ -41,13 +50,18 @@ public OperatorMapNotification(Func1<? super T, ? extends R> onNext, Func1<? sup

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {

Subscriber<T> subscriber = new Subscriber<T>() {
SingleEmitter<R> emitter;
@Override
public void setProducer(Producer producer) {
emitter = new SingleEmitter<R>(o, producer, this);
o.setProducer(emitter);
}

@Override
public void onCompleted() {
try {
o.onNext(onCompleted.call());
o.onCompleted();
emitter.offerAndComplete(onCompleted.call());
} catch (Throwable e) {
o.onError(e);
}
Expand All @@ -56,8 +70,7 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
try {
o.onNext(onError.call(e));
o.onCompleted();
emitter.offerAndComplete(onError.call(e));
} catch (Throwable e2) {
o.onError(e);
}
Expand All @@ -66,13 +79,159 @@ public void onError(Throwable e) {
@Override
public void onNext(T t) {
try {
o.onNext(onNext.call(t));
emitter.offer(onNext.call(t));
} catch (Throwable e) {
o.onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}

};
o.add(subscriber);
return subscriber;
}

}
static final class SingleEmitter<T> extends AtomicLong implements Producer, Subscription {
/** */
private static final long serialVersionUID = -249869671366010660L;
final NotificationLite<T> nl;
final Subscriber<? super T> child;
final Producer producer;
final Subscription cancel;
final Queue<Object> queue;
volatile boolean complete;
/** Guarded by this. */
boolean emitting;
/** Guarded by this. */
boolean missed;

public SingleEmitter(Subscriber<? super T> child, Producer producer, Subscription cancel) {
this.child = child;
this.producer = producer;
this.cancel = cancel;
this.queue = UnsafeAccess.isUnsafeAvailable()
? new SpscArrayQueue<Object>(2)
: new ConcurrentLinkedQueue<Object>();

this.nl = NotificationLite.instance();
}
@Override
public void request(long n) {
for (;;) {
long r = get();
if (r < 0) {
return;
}
long u = r + n;
if (u < 0) {
u = Long.MAX_VALUE;
}
if (compareAndSet(r, u)) {
producer.request(n);
drain();
return;
}
}
}

void produced(long n) {
for (;;) {
long r = get();
if (r < 0) {
return;
}
long u = r - n;
if (u < 0) {
throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")");
}
if (compareAndSet(r, u)) {
return;
}
}
}

public void offer(T value) {
if (!queue.offer(value)) {
child.onError(new MissingBackpressureException());
unsubscribe();
} else {
drain();
}
}
public void offerAndComplete(T value) {
if (!this.queue.offer(value)) {
child.onError(new MissingBackpressureException());
unsubscribe();
} else {
this.complete = true;
drain();
}
}

void drain() {
synchronized (this) {
if (emitting) {
missed = true;
return;
}
emitting = true;
missed = false;
}
boolean skipFinal = false;
try {
for (;;) {

long r = get();
boolean c = complete;
boolean empty = queue.isEmpty();

if (c && empty) {
child.onCompleted();
skipFinal = true;
return;
} else
if (r > 0) {
Object v = queue.poll();
if (v != null) {
child.onNext(nl.getValue(v));
produced(1);
} else
if (c) {
child.onCompleted();
skipFinal = true;
return;
}
}

synchronized (this) {
if (!missed) {
skipFinal = true;
emitting = false;
return;
}
missed = false;
}
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}

@Override
public boolean isUnsubscribed() {
return get() < 0;
}
@Override
public void unsubscribe() {
long r = get();
if (r != Long.MIN_VALUE) {
r = getAndSet(Long.MIN_VALUE);
if (r != Long.MIN_VALUE) {
cancel.unsubscribe();
}
}
}
}
}
Loading