Skip to content

Commit ae154cd

Browse files
committed
Merge, MergeDelayError and MergeMaxConcurrent unified and rewritten.
1 parent 2532484 commit ae154cd

14 files changed

+1279
-1097
lines changed

src/main/java/rx/Observable.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,8 +1663,9 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
16631663
* if {@code maxConcurrent} is less than or equal to 0
16641664
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
16651665
*/
1666+
@Experimental
16661667
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
1667-
return source.lift(new OperatorMergeMaxConcurrent<T>(maxConcurrent));
1668+
return source.lift(OperatorMerge.<T>instance(false, maxConcurrent));
16681669
}
16691670

16701671
/**
@@ -1935,7 +1936,31 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19351936
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
19361937
return merge(from(sequences));
19371938
}
1938-
1939+
1940+
/**
1941+
* Flattens an Array of Observables into one Observable, without any transformation, while limiting the
1942+
* number of concurrent subscriptions to these Observables.
1943+
* <p>
1944+
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.io.png" alt="">
1945+
* <p>
1946+
* You can combine items emitted by multiple Observables so that they appear as a single Observable, by
1947+
* using the {@code merge} method.
1948+
* <dl>
1949+
* <dt><b>Scheduler:</b></dt>
1950+
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
1951+
* </dl>
1952+
*
1953+
* @param sequences
1954+
* the Array of Observables
1955+
* @param maxConcurrent
1956+
* the maximum number of Observables that may be subscribed to concurrently
1957+
* @return an Observable that emits all of the items emitted by the Observables in the Array
1958+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1959+
*/
1960+
@Experimental
1961+
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent) {
1962+
return merge(from(sequences), maxConcurrent);
1963+
}
19391964
/**
19401965
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
19411966
* receive all successfully emitted items from all of the source Observables without being interrupted by
@@ -1963,6 +1988,37 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
19631988
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source) {
19641989
return source.lift(OperatorMerge.<T>instance(true));
19651990
}
1991+
/**
1992+
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
1993+
* receive all successfully emitted items from all of the source Observables without being interrupted by
1994+
* an error notification from one of them, while limiting the
1995+
* number of concurrent subscriptions to these Observables.
1996+
* <p>
1997+
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
1998+
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
1999+
* error notification until all of the merged Observables have finished emitting items.
2000+
* <p>
2001+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
2002+
* <p>
2003+
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
2004+
* invoke the {@code onError} method of its Observers once.
2005+
* <dl>
2006+
* <dt><b>Scheduler:</b></dt>
2007+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
2008+
* </dl>
2009+
*
2010+
* @param source
2011+
* an Observable that emits Observables
2012+
* @param maxConcurrent
2013+
* the maximum number of Observables that may be subscribed to concurrently
2014+
* @return an Observable that emits all of the items emitted by the Observables emitted by the
2015+
* {@code source} Observable
2016+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2017+
*/
2018+
@Experimental
2019+
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
2020+
return source.lift(OperatorMerge.<T>instance(true, maxConcurrent));
2021+
}
19662022

19672023
/**
19682024
* Flattens two Observables into one Observable, in a way that allows an Observer to receive all

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public void onNext(T t) {
4646
child.onCompleted();
4747
unsubscribe();
4848
} else {
49-
// if we drop values we must replace them upstream as downstream won't receive and request more
50-
request(1);
49+
// if we drop values we must replace them upstream as downstream won't receive and request more
50+
request(1);
5151
}
5252
}
5353

src/main/java/rx/internal/operators/OperatorMapNotification.java

Lines changed: 159 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.concurrent.atomic.AtomicLong;
19+
1820
import rx.Observable.Operator;
19-
import rx.Subscriber;
20-
import rx.exceptions.OnErrorThrowable;
21-
import rx.functions.Func0;
22-
import rx.functions.Func1;
21+
import rx.*;
22+
import rx.exceptions.*;
23+
import rx.functions.*;
2324

2425
/**
2526
* Applies a function of your choosing to every item emitted by an {@code Observable}, and emits the results of
@@ -41,13 +42,18 @@ public OperatorMapNotification(Func1<? super T, ? extends R> onNext, Func1<? sup
4142

4243
@Override
4344
public Subscriber<? super T> call(final Subscriber<? super R> o) {
44-
return new Subscriber<T>(o) {
45-
45+
Subscriber<T> subscriber = new Subscriber<T>() {
46+
SingleEmitter<R> emitter;
47+
@Override
48+
public void setProducer(Producer producer) {
49+
emitter = new SingleEmitter<R>(o, producer, this);
50+
o.setProducer(emitter);
51+
}
52+
4653
@Override
4754
public void onCompleted() {
4855
try {
49-
o.onNext(onCompleted.call());
50-
o.onCompleted();
56+
emitter.offerAndComplete(onCompleted.call());
5157
} catch (Throwable e) {
5258
o.onError(e);
5359
}
@@ -56,8 +62,7 @@ public void onCompleted() {
5662
@Override
5763
public void onError(Throwable e) {
5864
try {
59-
o.onNext(onError.call(e));
60-
o.onCompleted();
65+
emitter.offerAndComplete(onError.call(e));
6166
} catch (Throwable e2) {
6267
o.onError(e);
6368
}
@@ -66,13 +71,154 @@ public void onError(Throwable e) {
6671
@Override
6772
public void onNext(T t) {
6873
try {
69-
o.onNext(onNext.call(t));
74+
emitter.offer(onNext.call(t));
7075
} catch (Throwable e) {
7176
o.onError(OnErrorThrowable.addValueAsLastCause(e, t));
7277
}
7378
}
7479

7580
};
81+
o.add(subscriber);
82+
return subscriber;
7683
}
77-
78-
}
84+
static final class SingleEmitter<T> extends AtomicLong implements Producer, Subscription {
85+
/** */
86+
private static final long serialVersionUID = -249869671366010660L;
87+
final NotificationLite<T> nl;
88+
final Subscriber<? super T> child;
89+
final Producer producer;
90+
final Subscription cancel;
91+
volatile Object value;
92+
volatile boolean complete;
93+
/** Guarded by this. */
94+
boolean emitting;
95+
/** Guarded by this. */
96+
boolean missed;
97+
98+
public SingleEmitter(Subscriber<? super T> child, Producer producer, Subscription cancel) {
99+
this.child = child;
100+
this.producer = producer;
101+
this.cancel = cancel;
102+
this.nl = NotificationLite.instance();
103+
}
104+
@Override
105+
public void request(long n) {
106+
for (;;) {
107+
long r = get();
108+
if (r < 0) {
109+
return;
110+
}
111+
long u = r + n;
112+
if (u < 0) {
113+
u = Long.MAX_VALUE;
114+
}
115+
if (compareAndSet(r, u)) {
116+
producer.request(n);
117+
drain();
118+
return;
119+
}
120+
}
121+
}
122+
123+
void produced(long n) {
124+
for (;;) {
125+
long r = get();
126+
if (r < 0) {
127+
return;
128+
}
129+
long u = r - n;
130+
if (u < 0) {
131+
throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")");
132+
}
133+
if (compareAndSet(r, u)) {
134+
return;
135+
}
136+
}
137+
}
138+
139+
public void offer(T value) {
140+
if (this.value != null) {
141+
child.onError(new MissingBackpressureException());
142+
unsubscribe();
143+
} else {
144+
this.value = nl.next(value);
145+
drain();
146+
}
147+
}
148+
public void offerAndComplete(T value) {
149+
if (this.value != null) {
150+
child.onError(new MissingBackpressureException());
151+
unsubscribe();
152+
} else {
153+
this.value = nl.next(value);
154+
this.complete = true;
155+
drain();
156+
}
157+
}
158+
159+
void drain() {
160+
synchronized (this) {
161+
if (emitting) {
162+
missed = true;
163+
return;
164+
}
165+
emitting = true;
166+
missed = false;
167+
}
168+
boolean skipFinal = false;
169+
try {
170+
for (;;) {
171+
172+
long r = get();
173+
boolean c = complete;
174+
Object v = value;
175+
176+
if (c && v == null) {
177+
child.onCompleted();
178+
} else
179+
if (r > 0 && v != null) {
180+
value = null;
181+
child.onNext(nl.getValue(v));
182+
if (c) {
183+
child.onCompleted();
184+
skipFinal = true;
185+
return;
186+
} else {
187+
produced(1);
188+
}
189+
}
190+
191+
synchronized (this) {
192+
if (!missed) {
193+
skipFinal = true;
194+
emitting = false;
195+
return;
196+
}
197+
missed = false;
198+
}
199+
}
200+
} finally {
201+
if (!skipFinal) {
202+
synchronized (this) {
203+
emitting = false;
204+
}
205+
}
206+
}
207+
}
208+
209+
@Override
210+
public boolean isUnsubscribed() {
211+
return get() < 0;
212+
}
213+
@Override
214+
public void unsubscribe() {
215+
long r = get();
216+
if (r != Long.MIN_VALUE) {
217+
r = getAndSet(Long.MIN_VALUE);
218+
if (r != Long.MIN_VALUE) {
219+
cancel.unsubscribe();
220+
}
221+
}
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)