Skip to content

Commit 1d9283d

Browse files
authored
2.x: add doFinally for handling post terminal or cancel cleanup (#4831)
1 parent ba6f392 commit 1d9283d

File tree

3 files changed

+732
-0
lines changed

3 files changed

+732
-0
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7324,6 +7324,35 @@ public final Flowable<T> distinctUntilChanged(BiPredicate<? super T, ? super T>
73247324
return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged<T, T>(this, Functions.<T>identity(), comparer));
73257325
}
73267326

7327+
/**
7328+
* Calls the specified action after this Flowable signals onError or onCompleted or gets cancelled by
7329+
* the downstream.
7330+
* <p>In case of a race between a terminal event and a cancellation, the provided {@code onFinally} action
7331+
* is executed at once per subscription.
7332+
* <p>Note that the {@code onFinally} action is shared between subscriptions and as such
7333+
* should be thread-safe.
7334+
* <dl>
7335+
* <dt><b>Backpressure:</b></dt>
7336+
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
7337+
* behavior.</dd>
7338+
* <dt><b>Scheduler:</b></dt>
7339+
* <dd>{@code doFinally} does not operate by default on a particular {@link Scheduler}.</dd>
7340+
* <td><b>Operator-fusion:</b></dt>
7341+
* <dd>This operator supports normal and conditional Subscribers as well as boundary-limited
7342+
* synchronous or asynchronous queue-fusion.</dd>
7343+
* </dl>
7344+
* @param onFinally the action called when this Flowable terminates or gets cancelled
7345+
* @return the new Flowable instance
7346+
* @since 2.0.1 - experimental
7347+
*/
7348+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
7349+
@SchedulerSupport(SchedulerSupport.NONE)
7350+
@Experimental
7351+
public final Flowable<T> doFinally(Action onFinally) {
7352+
ObjectHelper.requireNonNull(onFinally, "onFinally is null");
7353+
return RxJavaPlugins.onAssembly(new FlowableDoFinally<T>(this, onFinally));
7354+
}
7355+
73277356
/**
73287357
* Registers an {@link Action} to be called when this Publisher invokes either
73297358
* {@link Subscriber#onComplete onComplete} or {@link Subscriber#onError onError}.
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.annotations.Experimental;
19+
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.functions.Action;
21+
import io.reactivex.internal.fuseable.*;
22+
import io.reactivex.internal.subscriptions.*;
23+
import io.reactivex.plugins.RxJavaPlugins;
24+
25+
/**
26+
* Execute an action after an onError, onComplete or a cancel event.
27+
*
28+
* @param <T> the value type
29+
* @since 2.0.1 - experimental
30+
*/
31+
@Experimental
32+
public final class FlowableDoFinally<T> extends AbstractFlowableWithUpstream<T, T> {
33+
34+
final Action onFinally;
35+
36+
public FlowableDoFinally(Publisher<T> source, Action onFinally) {
37+
super(source);
38+
this.onFinally = onFinally;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(Subscriber<? super T> s) {
43+
if (s instanceof ConditionalSubscriber) {
44+
source.subscribe(new DoFinallyConditionalSubscriber<T>((ConditionalSubscriber<? super T>)s, onFinally));
45+
} else {
46+
source.subscribe(new DoFinallySubscriber<T>(s, onFinally));
47+
}
48+
}
49+
50+
static final class DoFinallySubscriber<T> extends BasicIntQueueSubscription<T> implements Subscriber<T> {
51+
52+
private static final long serialVersionUID = 4109457741734051389L;
53+
54+
final Subscriber<? super T> actual;
55+
56+
final Action onFinally;
57+
58+
Subscription s;
59+
60+
QueueSubscription<T> qs;
61+
62+
boolean syncFused;
63+
64+
DoFinallySubscriber(Subscriber<? super T> actual, Action onFinally) {
65+
this.actual = actual;
66+
this.onFinally = onFinally;
67+
}
68+
69+
@SuppressWarnings("unchecked")
70+
@Override
71+
public void onSubscribe(Subscription s) {
72+
if (SubscriptionHelper.validate(this.s, s)) {
73+
this.s = s;
74+
if (s instanceof QueueSubscription) {
75+
this.qs = (QueueSubscription<T>)s;
76+
}
77+
78+
actual.onSubscribe(this);
79+
}
80+
}
81+
82+
@Override
83+
public void onNext(T t) {
84+
actual.onNext(t);
85+
}
86+
87+
@Override
88+
public void onError(Throwable t) {
89+
actual.onError(t);
90+
runFinally();
91+
}
92+
93+
@Override
94+
public void onComplete() {
95+
actual.onComplete();
96+
runFinally();
97+
}
98+
99+
@Override
100+
public void cancel() {
101+
s.cancel();
102+
runFinally();
103+
}
104+
105+
@Override
106+
public void request(long n) {
107+
s.request(n);
108+
}
109+
110+
@Override
111+
public int requestFusion(int mode) {
112+
QueueSubscription<T> qs = this.qs;
113+
if (qs != null && (mode & BOUNDARY) == 0) {
114+
int m = qs.requestFusion(mode);
115+
if (m != NONE) {
116+
syncFused = m == SYNC;
117+
}
118+
return m;
119+
}
120+
return NONE;
121+
}
122+
123+
@Override
124+
public void clear() {
125+
qs.clear();
126+
}
127+
128+
@Override
129+
public boolean isEmpty() {
130+
return qs.isEmpty();
131+
}
132+
133+
@Override
134+
public T poll() throws Exception {
135+
T v = qs.poll();
136+
if (v == null && syncFused) {
137+
runFinally();
138+
}
139+
return v;
140+
}
141+
142+
void runFinally() {
143+
if (compareAndSet(0, 1)) {
144+
try {
145+
onFinally.run();
146+
} catch (Throwable ex) {
147+
Exceptions.throwIfFatal(ex);
148+
RxJavaPlugins.onError(ex);
149+
}
150+
}
151+
}
152+
}
153+
154+
static final class DoFinallyConditionalSubscriber<T> extends BasicIntQueueSubscription<T> implements ConditionalSubscriber<T> {
155+
156+
private static final long serialVersionUID = 4109457741734051389L;
157+
158+
final ConditionalSubscriber<? super T> actual;
159+
160+
final Action onFinally;
161+
162+
Subscription s;
163+
164+
QueueSubscription<T> qs;
165+
166+
boolean syncFused;
167+
168+
DoFinallyConditionalSubscriber(ConditionalSubscriber<? super T> actual, Action onFinally) {
169+
this.actual = actual;
170+
this.onFinally = onFinally;
171+
}
172+
173+
@SuppressWarnings("unchecked")
174+
@Override
175+
public void onSubscribe(Subscription s) {
176+
if (SubscriptionHelper.validate(this.s, s)) {
177+
this.s = s;
178+
if (s instanceof QueueSubscription) {
179+
this.qs = (QueueSubscription<T>)s;
180+
}
181+
182+
actual.onSubscribe(this);
183+
}
184+
}
185+
186+
@Override
187+
public void onNext(T t) {
188+
actual.onNext(t);
189+
}
190+
191+
@Override
192+
public boolean tryOnNext(T t) {
193+
return actual.tryOnNext(t);
194+
}
195+
196+
@Override
197+
public void onError(Throwable t) {
198+
actual.onError(t);
199+
runFinally();
200+
}
201+
202+
@Override
203+
public void onComplete() {
204+
actual.onComplete();
205+
runFinally();
206+
}
207+
208+
@Override
209+
public void cancel() {
210+
s.cancel();
211+
runFinally();
212+
}
213+
214+
@Override
215+
public void request(long n) {
216+
s.request(n);
217+
}
218+
219+
@Override
220+
public int requestFusion(int mode) {
221+
QueueSubscription<T> qs = this.qs;
222+
if (qs != null && (mode & BOUNDARY) == 0) {
223+
int m = qs.requestFusion(mode);
224+
if (m != NONE) {
225+
syncFused = m == SYNC;
226+
}
227+
return m;
228+
}
229+
return NONE;
230+
}
231+
232+
@Override
233+
public void clear() {
234+
qs.clear();
235+
}
236+
237+
@Override
238+
public boolean isEmpty() {
239+
return qs.isEmpty();
240+
}
241+
242+
@Override
243+
public T poll() throws Exception {
244+
T v = qs.poll();
245+
if (v == null && syncFused) {
246+
runFinally();
247+
}
248+
return v;
249+
}
250+
251+
void runFinally() {
252+
if (compareAndSet(0, 1)) {
253+
try {
254+
onFinally.run();
255+
} catch (Throwable ex) {
256+
Exceptions.throwIfFatal(ex);
257+
RxJavaPlugins.onError(ex);
258+
}
259+
}
260+
}
261+
}
262+
}

0 commit comments

Comments
 (0)