Skip to content

Commit 15b18a7

Browse files
committed
2.x: Add Flowable.switchMapCompletable{DelayError} operator
1 parent 8068404 commit 15b18a7

File tree

4 files changed

+723
-1
lines changed

4 files changed

+723
-1
lines changed

src/main/java/io/reactivex/Flowable.java

+91
Original file line numberDiff line numberDiff line change
@@ -13951,6 +13951,97 @@ public final <R> Flowable<R> switchMap(Function<? super T, ? extends Publisher<?
1395113951
return switchMap0(mapper, bufferSize, false);
1395213952
}
1395313953

13954+
/**
13955+
* Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while
13956+
* disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one
13957+
* active {@code CompletableSource} running.
13958+
* <p>
13959+
* <img width="640" height="521" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMapCompletable.f.png" alt="">
13960+
* <p>
13961+
* Since a {@code CompletableSource} doesn't produce any items, the resulting reactive type of
13962+
* this operator is a {@link Completable} that can only indicate successful completion or
13963+
* a failure in any of the inner {@code CompletableSource}s or the failure of the current
13964+
* {@link Flowable}.
13965+
* <dl>
13966+
* <dt><b>Backpressure:</b></dt>
13967+
* <dd>The operator consumes the current {@link Flowable} in an unbounded manner and otherwise
13968+
* does not have backpressure in its return type because no items are ever produced.</dd>
13969+
* <dt><b>Scheduler:</b></dt>
13970+
* <dd>{@code switchMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
13971+
* <dt><b>Error handling:</b></dt>
13972+
* <dd>If either this {@code Flowable} or the active {@code CompletableSource} signals an {@code onError},
13973+
* the resulting {@code Completable} is terminated immediately with that {@code Throwable}.
13974+
* Use the {@link #switchMapCompletableDelayError(Function)} to delay such inner failures until
13975+
* every inner {@code CompletableSource}s and the main {@code Flowable} terminates in some fashion.
13976+
* If they fail concurrently, the operator may combine the {@code Throwable}s into a
13977+
* {@link io.reactivex.exceptions.CompositeException CompositeException}
13978+
* and signal it to the downstream instead. If any inactivated (switched out) {@code CompletableSource}
13979+
* signals an {@code onError} late, the {@code Throwable}s will be signalled to the global error handler via
13980+
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors.
13981+
* </dd>
13982+
* </dl>
13983+
* @param mapper the function called with each upstream item and should return a
13984+
* {@link CompletableSource} to be subscribed to and awaited for
13985+
* (non blockingly) for its terminal event
13986+
* @return the new Completable instance
13987+
* @since 2.1.11 - experimental
13988+
* @see #switchMapCompletableDelayError(Function)
13989+
*/
13990+
@CheckReturnValue
13991+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
13992+
@SchedulerSupport(SchedulerSupport.NONE)
13993+
@Experimental
13994+
public final Completable switchMapCompletable(@NonNull Function<? super T, ? extends CompletableSource> mapper) {
13995+
ObjectHelper.requireNonNull(mapper, "mapper is null");
13996+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletable<T>(this, mapper, false));
13997+
}
13998+
13999+
/**
14000+
* Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while
14001+
* disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one
14002+
* active {@code CompletableSource} running and delaying any main or inner errors until all
14003+
* of them terminate.
14004+
* <p>
14005+
* <img width="640" height="453" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMapCompletableDelayError.f.png" alt="">
14006+
* <p>
14007+
* Since a {@code CompletableSource} doesn't produce any items, the resulting reactive type of
14008+
* this operator is a {@link Completable} that can only indicate successful completion or
14009+
* a failure in any of the inner {@code CompletableSource}s or the failure of the current
14010+
* {@link Flowable}.
14011+
* <dl>
14012+
* <dt><b>Backpressure:</b></dt>
14013+
* <dd>The operator consumes the current {@link Flowable} in an unbounded manner and otherwise
14014+
* does not have backpressure in its return type because no items are ever produced.</dd>
14015+
* <dt><b>Scheduler:</b></dt>
14016+
* <dd>{@code switchMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
14017+
* <dt><b>Error handling:</b></dt>
14018+
* <dd>Errors of this {@code Flowable} and all the {@code CompletableSource}s, who had the chance
14019+
* to run to their completion, are delayed until
14020+
* all of the terminate in some fashion. At this point, if there was only one failure, the respective
14021+
* {@code Throwable} is emitted to the dowstream. It there were more than one failures, the
14022+
* operator combines all {@code Throwable}s into a {@link io.reactivex.exceptions.CompositeException CompositeException}
14023+
* and signals that to the downstream.
14024+
* If any inactivated (switched out) {@code CompletableSource}
14025+
* signals an {@code onError} late, the {@code Throwable}s will be signalled to the global error handler via
14026+
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors.
14027+
* </dd>
14028+
* </dl>
14029+
* @param mapper the function called with each upstream item and should return a
14030+
* {@link CompletableSource} to be subscribed to and awaited for
14031+
* (non blockingly) for its terminal event
14032+
* @return the new Completable instance
14033+
* @since 2.1.11 - experimental
14034+
* @see #switchMapCompletableDelayError(Function)
14035+
*/
14036+
@CheckReturnValue
14037+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
14038+
@SchedulerSupport(SchedulerSupport.NONE)
14039+
@Experimental
14040+
public final Completable switchMapCompletableDelayError(@NonNull Function<? super T, ? extends CompletableSource> mapper) {
14041+
ObjectHelper.requireNonNull(mapper, "mapper is null");
14042+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletable<T>(this, mapper, true));
14043+
}
14044+
1395414045
/**
1395514046
* Returns a new Publisher by applying a function that you supply to each item emitted by the source
1395614047
* Publisher that returns a Publisher, and then emitting the items emitted by the most recently emitted
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import org.reactivestreams.Subscription;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.annotations.Experimental;
22+
import io.reactivex.disposables.Disposable;
23+
import io.reactivex.exceptions.Exceptions;
24+
import io.reactivex.functions.Function;
25+
import io.reactivex.internal.disposables.DisposableHelper;
26+
import io.reactivex.internal.functions.ObjectHelper;
27+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
28+
import io.reactivex.internal.util.*;
29+
import io.reactivex.plugins.RxJavaPlugins;
30+
31+
/**
32+
* Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while
33+
* disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one
34+
* active {@code CompletableSource} running.
35+
*
36+
* @param <T> the upstream value type
37+
* @since 2.1.11 - experimental
38+
*/
39+
@Experimental
40+
public final class FlowableSwitchMapCompletable<T> extends Completable {
41+
42+
final Flowable<T> source;
43+
44+
final Function<? super T, ? extends CompletableSource> mapper;
45+
46+
final boolean delayErrors;
47+
48+
public FlowableSwitchMapCompletable(Flowable<T> source,
49+
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
50+
this.source = source;
51+
this.mapper = mapper;
52+
this.delayErrors = delayErrors;
53+
}
54+
55+
@Override
56+
protected void subscribeActual(CompletableObserver s) {
57+
source.subscribe(new SwitchMapCompletableObserver<T>(s, mapper, delayErrors));
58+
}
59+
60+
static final class SwitchMapCompletableObserver<T> implements FlowableSubscriber<T>, Disposable {
61+
62+
final CompletableObserver downstream;
63+
64+
final Function<? super T, ? extends CompletableSource> mapper;
65+
66+
final boolean delayErrors;
67+
68+
final AtomicThrowable errors;
69+
70+
final AtomicReference<SwitchMapInnerObserver> inner;
71+
72+
static final SwitchMapInnerObserver INNER_DISPOSED = new SwitchMapInnerObserver(null);
73+
74+
volatile boolean done;
75+
76+
Subscription upstream;
77+
78+
SwitchMapCompletableObserver(CompletableObserver downstream,
79+
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
80+
this.downstream = downstream;
81+
this.mapper = mapper;
82+
this.delayErrors = delayErrors;
83+
this.errors = new AtomicThrowable();
84+
this.inner = new AtomicReference<SwitchMapInnerObserver>();
85+
}
86+
87+
@Override
88+
public void onSubscribe(Subscription s) {
89+
if (SubscriptionHelper.validate(upstream, s)) {
90+
this.upstream = s;
91+
downstream.onSubscribe(this);
92+
s.request(Long.MAX_VALUE);
93+
}
94+
}
95+
96+
@Override
97+
public void onNext(T t) {
98+
CompletableSource c;
99+
100+
try {
101+
c = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null CompletableSource");
102+
} catch (Throwable ex) {
103+
Exceptions.throwIfFatal(ex);
104+
upstream.cancel();
105+
onError(ex);
106+
return;
107+
}
108+
109+
SwitchMapInnerObserver o = new SwitchMapInnerObserver(this);
110+
111+
for (;;) {
112+
SwitchMapInnerObserver current = inner.get();
113+
if (current == INNER_DISPOSED) {
114+
break;
115+
}
116+
if (inner.compareAndSet(current, o)) {
117+
if (current != null) {
118+
current.dispose();
119+
}
120+
c.subscribe(o);
121+
break;
122+
}
123+
}
124+
}
125+
126+
@Override
127+
public void onError(Throwable t) {
128+
if (errors.addThrowable(t)) {
129+
if (delayErrors) {
130+
onComplete();
131+
} else {
132+
disposeInner();
133+
Throwable ex = errors.terminate();
134+
if (ex != ExceptionHelper.TERMINATED) {
135+
downstream.onError(ex);
136+
}
137+
}
138+
} else {
139+
RxJavaPlugins.onError(t);
140+
}
141+
}
142+
143+
@Override
144+
public void onComplete() {
145+
done = true;
146+
if (inner.get() == null) {
147+
Throwable ex = errors.terminate();
148+
if (ex == null) {
149+
downstream.onComplete();
150+
} else {
151+
downstream.onError(ex);
152+
}
153+
}
154+
}
155+
156+
void disposeInner() {
157+
SwitchMapInnerObserver o = inner.getAndSet(INNER_DISPOSED);
158+
if (o != null && o != INNER_DISPOSED) {
159+
o.dispose();
160+
}
161+
}
162+
163+
@Override
164+
public void dispose() {
165+
upstream.cancel();
166+
disposeInner();
167+
}
168+
169+
@Override
170+
public boolean isDisposed() {
171+
return inner.get() == INNER_DISPOSED;
172+
}
173+
174+
void innerError(SwitchMapInnerObserver sender, Throwable error) {
175+
if (inner.compareAndSet(sender, null)) {
176+
if (errors.addThrowable(error)) {
177+
if (delayErrors) {
178+
if (done) {
179+
Throwable ex = errors.terminate();
180+
downstream.onError(ex);
181+
}
182+
} else {
183+
dispose();
184+
Throwable ex = errors.terminate();
185+
if (ex != ExceptionHelper.TERMINATED) {
186+
downstream.onError(ex);
187+
}
188+
}
189+
return;
190+
}
191+
}
192+
RxJavaPlugins.onError(error);
193+
}
194+
195+
void innerComplete(SwitchMapInnerObserver sender) {
196+
if (inner.compareAndSet(sender, null)) {
197+
if (done) {
198+
Throwable ex = errors.terminate();
199+
if (ex == null) {
200+
downstream.onComplete();
201+
} else {
202+
downstream.onError(ex);
203+
}
204+
}
205+
}
206+
}
207+
208+
static final class SwitchMapInnerObserver extends AtomicReference<Disposable>
209+
implements CompletableObserver {
210+
211+
private static final long serialVersionUID = -8003404460084760287L;
212+
213+
final SwitchMapCompletableObserver<?> parent;
214+
215+
SwitchMapInnerObserver(SwitchMapCompletableObserver<?> parent) {
216+
this.parent = parent;
217+
}
218+
219+
@Override
220+
public void onSubscribe(Disposable d) {
221+
DisposableHelper.setOnce(this, d);
222+
}
223+
224+
@Override
225+
public void onError(Throwable e) {
226+
parent.innerError(this, e);
227+
}
228+
229+
@Override
230+
public void onComplete() {
231+
parent.innerComplete(this);
232+
}
233+
234+
void dispose() {
235+
DisposableHelper.dispose(this);
236+
}
237+
}
238+
}
239+
}

src/test/java/io/reactivex/InternalWrongNaming.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,10 @@ public void flowableNoObserver() throws Exception {
185185
"FlowableConcatWithCompletable",
186186
"FlowableMergeWithSingle",
187187
"FlowableMergeWithMaybe",
188-
"FlowableMergeWithCompletable"
188+
"FlowableMergeWithCompletable",
189+
"FlowableSwitchMapCompletable",
190+
"FlowableSwitchMapSingle",
191+
"FlowableSwitchMapMaybe"
189192
);
190193
}
191194
}

0 commit comments

Comments
 (0)