Skip to content

Commit 75b7558

Browse files
committed
2.x: Add Flowable.switchMapCompletable{DelayError} operator
1 parent 84004a6 commit 75b7558

File tree

3 files changed

+720
-0
lines changed

3 files changed

+720
-0
lines changed

src/main/java/io/reactivex/Flowable.java

+92
Original file line numberDiff line numberDiff line change
@@ -14114,6 +14114,98 @@ public final <R> Flowable<R> switchMap(Function<? super T, ? extends Publisher<?
1411414114
return switchMap0(mapper, bufferSize, false);
1411514115
}
1411614116

14117+
14118+
/**
14119+
* Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while
14120+
* disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one
14121+
* active {@code CompletableSource} running.
14122+
* <p>
14123+
* <img width="640" height="521" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMapCompletable.f.png" alt="">
14124+
* <p>
14125+
* Since a {@code CompletableSource} doesn't produce any items, the resulting reactive type of
14126+
* this operator is a {@link Completable} that can only indicate successful completion or
14127+
* a failure in any of the inner {@code CompletableSource}s or the failure of the current
14128+
* {@link Flowable}.
14129+
* <dl>
14130+
* <dt><b>Backpressure:</b></dt>
14131+
* <dd>The operator consumes the current {@link Flowable} in an unbounded manner and otherwise
14132+
* does not have backpressure in its return type because no items are ever produced.</dd>
14133+
* <dt><b>Scheduler:</b></dt>
14134+
* <dd>{@code switchMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
14135+
* <dt><b>Error handling:</b></dt>
14136+
* <dd>If either this {@code Flowable} or the active {@code CompletableSource} signals an {@code onError},
14137+
* the resulting {@code Completable} is terminated immediately with that {@code Throwable}.
14138+
* Use the {@link #switchMapCompletableDelayError(Function)} to delay such inner failures until
14139+
* every inner {@code CompletableSource}s and the main {@code Flowable} terminates in some fashion.
14140+
* If they fail concurrently, the operator may combine the {@code Throwable}s into a
14141+
* {@link io.reactivex.exceptions.CompositeException CompositeException}
14142+
* and signal it to the downstream instead. If any inactivated (switched out) {@code CompletableSource}
14143+
* signals an {@code onError} late, the {@code Throwable}s will be signalled to the global error handler via
14144+
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors.
14145+
* </dd>
14146+
* </dl>
14147+
* @param mapper the function called with each upstream item and should return a
14148+
* {@link CompletableSource} to be subscribed to and awaited for
14149+
* (non blockingly) for its terminal event
14150+
* @return the new Completable instance
14151+
* @since 2.1.11 - experimental
14152+
* @see #switchMapCompletableDelayError(Function)
14153+
*/
14154+
@CheckReturnValue
14155+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
14156+
@SchedulerSupport(SchedulerSupport.NONE)
14157+
@Experimental
14158+
public final Completable switchMapCompletable(@NonNull Function<? super T, ? extends CompletableSource> mapper) {
14159+
ObjectHelper.requireNonNull(mapper, "mapper is null");
14160+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletable<T>(this, mapper, false));
14161+
}
14162+
14163+
/**
14164+
* Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while
14165+
* disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one
14166+
* active {@code CompletableSource} running and delaying any main or inner errors until all
14167+
* of them terminate.
14168+
* <p>
14169+
* <img width="640" height="453" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMapCompletableDelayError.f.png" alt="">
14170+
* <p>
14171+
* Since a {@code CompletableSource} doesn't produce any items, the resulting reactive type of
14172+
* this operator is a {@link Completable} that can only indicate successful completion or
14173+
* a failure in any of the inner {@code CompletableSource}s or the failure of the current
14174+
* {@link Flowable}.
14175+
* <dl>
14176+
* <dt><b>Backpressure:</b></dt>
14177+
* <dd>The operator consumes the current {@link Flowable} in an unbounded manner and otherwise
14178+
* does not have backpressure in its return type because no items are ever produced.</dd>
14179+
* <dt><b>Scheduler:</b></dt>
14180+
* <dd>{@code switchMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
14181+
* <dt><b>Error handling:</b></dt>
14182+
* <dd>Errors of this {@code Flowable} and all the {@code CompletableSource}s, who had the chance
14183+
* to run to their completion, are delayed until
14184+
* all of them terminate in some fashion. At this point, if there was only one failure, the respective
14185+
* {@code Throwable} is emitted to the dowstream. It there were more than one failures, the
14186+
* operator combines all {@code Throwable}s into a {@link io.reactivex.exceptions.CompositeException CompositeException}
14187+
* and signals that to the downstream.
14188+
* If any inactivated (switched out) {@code CompletableSource}
14189+
* signals an {@code onError} late, the {@code Throwable}s will be signalled to the global error handler via
14190+
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors.
14191+
* </dd>
14192+
* </dl>
14193+
* @param mapper the function called with each upstream item and should return a
14194+
* {@link CompletableSource} to be subscribed to and awaited for
14195+
* (non blockingly) for its terminal event
14196+
* @return the new Completable instance
14197+
* @since 2.1.11 - experimental
14198+
* @see #switchMapCompletableDelayError(Function)
14199+
*/
14200+
@CheckReturnValue
14201+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
14202+
@SchedulerSupport(SchedulerSupport.NONE)
14203+
@Experimental
14204+
public final Completable switchMapCompletableDelayError(@NonNull Function<? super T, ? extends CompletableSource> mapper) {
14205+
ObjectHelper.requireNonNull(mapper, "mapper is null");
14206+
return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletable<T>(this, mapper, true));
14207+
}
14208+
1411714209
/**
1411814210
* Returns a new Publisher by applying a function that you supply to each item emitted by the source
1411914211
* Publisher that returns a Publisher, and then emitting the items emitted by the most recently emitted
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.mixed;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import org.reactivestreams.Subscription;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.annotations.Experimental;
22+
import io.reactivex.disposables.Disposable;
23+
import io.reactivex.exceptions.Exceptions;
24+
import io.reactivex.functions.Function;
25+
import io.reactivex.internal.disposables.DisposableHelper;
26+
import io.reactivex.internal.functions.ObjectHelper;
27+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
28+
import io.reactivex.internal.util.*;
29+
import io.reactivex.plugins.RxJavaPlugins;
30+
31+
/**
32+
* Maps the upstream values into {@link CompletableSource}s, subscribes to the newer one while
33+
* disposing the subscription to the previous {@code CompletableSource}, thus keeping at most one
34+
* active {@code CompletableSource} running.
35+
*
36+
* @param <T> the upstream value type
37+
* @since 2.1.11 - experimental
38+
*/
39+
@Experimental
40+
public final class FlowableSwitchMapCompletable<T> extends Completable {
41+
42+
final Flowable<T> source;
43+
44+
final Function<? super T, ? extends CompletableSource> mapper;
45+
46+
final boolean delayErrors;
47+
48+
public FlowableSwitchMapCompletable(Flowable<T> source,
49+
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
50+
this.source = source;
51+
this.mapper = mapper;
52+
this.delayErrors = delayErrors;
53+
}
54+
55+
@Override
56+
protected void subscribeActual(CompletableObserver s) {
57+
source.subscribe(new SwitchMapCompletableObserver<T>(s, mapper, delayErrors));
58+
}
59+
60+
static final class SwitchMapCompletableObserver<T> implements FlowableSubscriber<T>, Disposable {
61+
62+
final CompletableObserver downstream;
63+
64+
final Function<? super T, ? extends CompletableSource> mapper;
65+
66+
final boolean delayErrors;
67+
68+
final AtomicThrowable errors;
69+
70+
final AtomicReference<SwitchMapInnerObserver> inner;
71+
72+
static final SwitchMapInnerObserver INNER_DISPOSED = new SwitchMapInnerObserver(null);
73+
74+
volatile boolean done;
75+
76+
Subscription upstream;
77+
78+
SwitchMapCompletableObserver(CompletableObserver downstream,
79+
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
80+
this.downstream = downstream;
81+
this.mapper = mapper;
82+
this.delayErrors = delayErrors;
83+
this.errors = new AtomicThrowable();
84+
this.inner = new AtomicReference<SwitchMapInnerObserver>();
85+
}
86+
87+
@Override
88+
public void onSubscribe(Subscription s) {
89+
if (SubscriptionHelper.validate(upstream, s)) {
90+
this.upstream = s;
91+
downstream.onSubscribe(this);
92+
s.request(Long.MAX_VALUE);
93+
}
94+
}
95+
96+
@Override
97+
public void onNext(T t) {
98+
CompletableSource c;
99+
100+
try {
101+
c = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null CompletableSource");
102+
} catch (Throwable ex) {
103+
Exceptions.throwIfFatal(ex);
104+
upstream.cancel();
105+
onError(ex);
106+
return;
107+
}
108+
109+
SwitchMapInnerObserver o = new SwitchMapInnerObserver(this);
110+
111+
for (;;) {
112+
SwitchMapInnerObserver current = inner.get();
113+
if (current == INNER_DISPOSED) {
114+
break;
115+
}
116+
if (inner.compareAndSet(current, o)) {
117+
if (current != null) {
118+
current.dispose();
119+
}
120+
c.subscribe(o);
121+
break;
122+
}
123+
}
124+
}
125+
126+
@Override
127+
public void onError(Throwable t) {
128+
if (errors.addThrowable(t)) {
129+
if (delayErrors) {
130+
onComplete();
131+
} else {
132+
disposeInner();
133+
Throwable ex = errors.terminate();
134+
if (ex != ExceptionHelper.TERMINATED) {
135+
downstream.onError(ex);
136+
}
137+
}
138+
} else {
139+
RxJavaPlugins.onError(t);
140+
}
141+
}
142+
143+
@Override
144+
public void onComplete() {
145+
done = true;
146+
if (inner.get() == null) {
147+
Throwable ex = errors.terminate();
148+
if (ex == null) {
149+
downstream.onComplete();
150+
} else {
151+
downstream.onError(ex);
152+
}
153+
}
154+
}
155+
156+
void disposeInner() {
157+
SwitchMapInnerObserver o = inner.getAndSet(INNER_DISPOSED);
158+
if (o != null && o != INNER_DISPOSED) {
159+
o.dispose();
160+
}
161+
}
162+
163+
@Override
164+
public void dispose() {
165+
upstream.cancel();
166+
disposeInner();
167+
}
168+
169+
@Override
170+
public boolean isDisposed() {
171+
return inner.get() == INNER_DISPOSED;
172+
}
173+
174+
void innerError(SwitchMapInnerObserver sender, Throwable error) {
175+
if (inner.compareAndSet(sender, null)) {
176+
if (errors.addThrowable(error)) {
177+
if (delayErrors) {
178+
if (done) {
179+
Throwable ex = errors.terminate();
180+
downstream.onError(ex);
181+
}
182+
} else {
183+
dispose();
184+
Throwable ex = errors.terminate();
185+
if (ex != ExceptionHelper.TERMINATED) {
186+
downstream.onError(ex);
187+
}
188+
}
189+
return;
190+
}
191+
}
192+
RxJavaPlugins.onError(error);
193+
}
194+
195+
void innerComplete(SwitchMapInnerObserver sender) {
196+
if (inner.compareAndSet(sender, null)) {
197+
if (done) {
198+
Throwable ex = errors.terminate();
199+
if (ex == null) {
200+
downstream.onComplete();
201+
} else {
202+
downstream.onError(ex);
203+
}
204+
}
205+
}
206+
}
207+
208+
static final class SwitchMapInnerObserver extends AtomicReference<Disposable>
209+
implements CompletableObserver {
210+
211+
private static final long serialVersionUID = -8003404460084760287L;
212+
213+
final SwitchMapCompletableObserver<?> parent;
214+
215+
SwitchMapInnerObserver(SwitchMapCompletableObserver<?> parent) {
216+
this.parent = parent;
217+
}
218+
219+
@Override
220+
public void onSubscribe(Disposable d) {
221+
DisposableHelper.setOnce(this, d);
222+
}
223+
224+
@Override
225+
public void onError(Throwable e) {
226+
parent.innerError(this, e);
227+
}
228+
229+
@Override
230+
public void onComplete() {
231+
parent.innerComplete(this);
232+
}
233+
234+
void dispose() {
235+
DisposableHelper.dispose(this);
236+
}
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)