Skip to content

Commit 36845db

Browse files
Merge pull request #2928 from akarnokd/OperatorMergeFullRewrite
Merge and MergeMaxConcurrent unified and rewritten
2 parents 44945dc + 0420193 commit 36845db

12 files changed

+1211
-1040
lines changed

src/main/java/rx/Observable.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1718,7 +1718,11 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
17181718
* {@code source} Observable
17191719
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
17201720
*/
1721+
@SuppressWarnings({"unchecked", "rawtypes"})
17211722
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
1723+
if (source.getClass() == ScalarSynchronousObservable.class) {
1724+
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
1725+
}
17221726
return source.lift(OperatorMerge.<T>instance(false));
17231727
}
17241728

@@ -1746,8 +1750,13 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
17461750
* if {@code maxConcurrent} is less than or equal to 0
17471751
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
17481752
*/
1753+
@Experimental
1754+
@SuppressWarnings({"unchecked", "rawtypes"})
17491755
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
1750-
return source.lift(new OperatorMergeMaxConcurrent<T>(maxConcurrent));
1756+
if (source.getClass() == ScalarSynchronousObservable.class) {
1757+
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
1758+
}
1759+
return source.lift(OperatorMerge.<T>instance(false, maxConcurrent));
17511760
}
17521761

17531762
/**
@@ -2018,7 +2027,31 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
20182027
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
20192028
return merge(from(sequences));
20202029
}
2021-
2030+
2031+
/**
2032+
* Flattens an Array of Observables into one Observable, without any transformation, while limiting the
2033+
* number of concurrent subscriptions to these Observables.
2034+
* <p>
2035+
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.io.png" alt="">
2036+
* <p>
2037+
* You can combine items emitted by multiple Observables so that they appear as a single Observable, by
2038+
* using the {@code merge} method.
2039+
* <dl>
2040+
* <dt><b>Scheduler:</b></dt>
2041+
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
2042+
* </dl>
2043+
*
2044+
* @param sequences
2045+
* the Array of Observables
2046+
* @param maxConcurrent
2047+
* the maximum number of Observables that may be subscribed to concurrently
2048+
* @return an Observable that emits all of the items emitted by the Observables in the Array
2049+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2050+
*/
2051+
@Experimental
2052+
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent) {
2053+
return merge(from(sequences), maxConcurrent);
2054+
}
20222055
/**
20232056
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
20242057
* receive all successfully emitted items from all of the source Observables without being interrupted by
@@ -2046,6 +2079,37 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
20462079
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source) {
20472080
return source.lift(OperatorMerge.<T>instance(true));
20482081
}
2082+
/**
2083+
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
2084+
* receive all successfully emitted items from all of the source Observables without being interrupted by
2085+
* an error notification from one of them, while limiting the
2086+
* number of concurrent subscriptions to these Observables.
2087+
* <p>
2088+
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
2089+
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
2090+
* error notification until all of the merged Observables have finished emitting items.
2091+
* <p>
2092+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
2093+
* <p>
2094+
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
2095+
* invoke the {@code onError} method of its Observers once.
2096+
* <dl>
2097+
* <dt><b>Scheduler:</b></dt>
2098+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
2099+
* </dl>
2100+
*
2101+
* @param source
2102+
* an Observable that emits Observables
2103+
* @param maxConcurrent
2104+
* the maximum number of Observables that may be subscribed to concurrently
2105+
* @return an Observable that emits all of the items emitted by the Observables emitted by the
2106+
* {@code source} Observable
2107+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2108+
*/
2109+
@Experimental
2110+
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
2111+
return source.lift(OperatorMerge.<T>instance(true, maxConcurrent));
2112+
}
20492113

20502114
/**
20512115
* Flattens two Observables into one Observable, in a way that allows an Observer to receive all
@@ -4649,6 +4713,9 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
46494713
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
46504714
*/
46514715
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
4716+
if (getClass() == ScalarSynchronousObservable.class) {
4717+
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
4718+
}
46524719
return merge(map(func));
46534720
}
46544721

@@ -4677,6 +4744,9 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
46774744
*/
46784745
@Beta
46794746
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
4747+
if (getClass() == ScalarSynchronousObservable.class) {
4748+
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
4749+
}
46804750
return merge(map(func), maxConcurrent);
46814751
}
46824752

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,4 @@ public void onCompleted() {
7777
child.setProducer(producer);
7878
return s;
7979
}
80-
}
80+
}

src/main/java/rx/internal/operators/OperatorMapNotification.java

Lines changed: 168 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,20 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.Queue;
19+
import java.util.concurrent.ConcurrentLinkedQueue;
20+
import java.util.concurrent.atomic.AtomicLong;
21+
1822
import rx.Observable.Operator;
23+
import rx.Producer;
1924
import rx.Subscriber;
25+
import rx.Subscription;
26+
import rx.exceptions.MissingBackpressureException;
2027
import rx.exceptions.OnErrorThrowable;
2128
import rx.functions.Func0;
2229
import rx.functions.Func1;
30+
import rx.internal.util.unsafe.SpscArrayQueue;
31+
import rx.internal.util.unsafe.UnsafeAccess;
2332

2433
/**
2534
* Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
@@ -41,13 +50,18 @@ public OperatorMapNotification(Func1<? super T, ? extends R> onNext, Func1<? sup
4150

4251
@Override
4352
public Subscriber<? super T> call(final Subscriber<? super R> o) {
44-
return new Subscriber<T>(o) {
45-
53+
Subscriber<T> subscriber = new Subscriber<T>() {
54+
SingleEmitter<R> emitter;
55+
@Override
56+
public void setProducer(Producer producer) {
57+
emitter = new SingleEmitter<R>(o, producer, this);
58+
o.setProducer(emitter);
59+
}
60+
4661
@Override
4762
public void onCompleted() {
4863
try {
49-
o.onNext(onCompleted.call());
50-
o.onCompleted();
64+
emitter.offerAndComplete(onCompleted.call());
5165
} catch (Throwable e) {
5266
o.onError(e);
5367
}
@@ -56,8 +70,7 @@ public void onCompleted() {
5670
@Override
5771
public void onError(Throwable e) {
5872
try {
59-
o.onNext(onError.call(e));
60-
o.onCompleted();
73+
emitter.offerAndComplete(onError.call(e));
6174
} catch (Throwable e2) {
6275
o.onError(e);
6376
}
@@ -66,13 +79,159 @@ public void onError(Throwable e) {
6679
@Override
6780
public void onNext(T t) {
6881
try {
69-
o.onNext(onNext.call(t));
82+
emitter.offer(onNext.call(t));
7083
} catch (Throwable e) {
7184
o.onError(OnErrorThrowable.addValueAsLastCause(e, t));
7285
}
7386
}
7487

7588
};
89+
o.add(subscriber);
90+
return subscriber;
7691
}
77-
78-
}
92+
static final class SingleEmitter<T> extends AtomicLong implements Producer, Subscription {
93+
/** */
94+
private static final long serialVersionUID = -249869671366010660L;
95+
final NotificationLite<T> nl;
96+
final Subscriber<? super T> child;
97+
final Producer producer;
98+
final Subscription cancel;
99+
final Queue<Object> queue;
100+
volatile boolean complete;
101+
/** Guarded by this. */
102+
boolean emitting;
103+
/** Guarded by this. */
104+
boolean missed;
105+
106+
public SingleEmitter(Subscriber<? super T> child, Producer producer, Subscription cancel) {
107+
this.child = child;
108+
this.producer = producer;
109+
this.cancel = cancel;
110+
this.queue = UnsafeAccess.isUnsafeAvailable()
111+
? new SpscArrayQueue<Object>(2)
112+
: new ConcurrentLinkedQueue<Object>();
113+
114+
this.nl = NotificationLite.instance();
115+
}
116+
@Override
117+
public void request(long n) {
118+
for (;;) {
119+
long r = get();
120+
if (r < 0) {
121+
return;
122+
}
123+
long u = r + n;
124+
if (u < 0) {
125+
u = Long.MAX_VALUE;
126+
}
127+
if (compareAndSet(r, u)) {
128+
producer.request(n);
129+
drain();
130+
return;
131+
}
132+
}
133+
}
134+
135+
void produced(long n) {
136+
for (;;) {
137+
long r = get();
138+
if (r < 0) {
139+
return;
140+
}
141+
long u = r - n;
142+
if (u < 0) {
143+
throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")");
144+
}
145+
if (compareAndSet(r, u)) {
146+
return;
147+
}
148+
}
149+
}
150+
151+
public void offer(T value) {
152+
if (!queue.offer(value)) {
153+
child.onError(new MissingBackpressureException());
154+
unsubscribe();
155+
} else {
156+
drain();
157+
}
158+
}
159+
public void offerAndComplete(T value) {
160+
if (!this.queue.offer(value)) {
161+
child.onError(new MissingBackpressureException());
162+
unsubscribe();
163+
} else {
164+
this.complete = true;
165+
drain();
166+
}
167+
}
168+
169+
void drain() {
170+
synchronized (this) {
171+
if (emitting) {
172+
missed = true;
173+
return;
174+
}
175+
emitting = true;
176+
missed = false;
177+
}
178+
boolean skipFinal = false;
179+
try {
180+
for (;;) {
181+
182+
long r = get();
183+
boolean c = complete;
184+
boolean empty = queue.isEmpty();
185+
186+
if (c && empty) {
187+
child.onCompleted();
188+
skipFinal = true;
189+
return;
190+
} else
191+
if (r > 0) {
192+
Object v = queue.poll();
193+
if (v != null) {
194+
child.onNext(nl.getValue(v));
195+
produced(1);
196+
} else
197+
if (c) {
198+
child.onCompleted();
199+
skipFinal = true;
200+
return;
201+
}
202+
}
203+
204+
synchronized (this) {
205+
if (!missed) {
206+
skipFinal = true;
207+
emitting = false;
208+
return;
209+
}
210+
missed = false;
211+
}
212+
}
213+
} finally {
214+
if (!skipFinal) {
215+
synchronized (this) {
216+
emitting = false;
217+
}
218+
}
219+
}
220+
}
221+
222+
@Override
223+
public boolean isUnsubscribed() {
224+
return get() < 0;
225+
}
226+
@Override
227+
public void unsubscribe() {
228+
long r = get();
229+
if (r != Long.MIN_VALUE) {
230+
r = getAndSet(Long.MIN_VALUE);
231+
if (r != Long.MIN_VALUE) {
232+
cancel.unsubscribe();
233+
}
234+
}
235+
}
236+
}
237+
}

0 commit comments

Comments
 (0)