Skip to content

Commit 715d71d

Browse files
authored
2.x: add Flowable.doAfterNext operator + 3 doX TCKs (#4833)
* 2.x: add `Flowable.doAfterNext` operator + 3 doX TCK * Fix operator name in the scheduler javadoc
1 parent 2f8e8bc commit 715d71d

File tree

6 files changed

+523
-0
lines changed

6 files changed

+523
-0
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7353,6 +7353,32 @@ public final Flowable<T> doFinally(Action onFinally) {
73537353
return RxJavaPlugins.onAssembly(new FlowableDoFinally<T>(this, onFinally));
73547354
}
73557355

7356+
/**
7357+
* Calls the specified consumer with the current item after this item has been emitted to the downstream.
7358+
* <p>Note that the {@code onAfterNext} action is shared between subscriptions and as such
7359+
* should be thread-safe.
7360+
* <dl>
7361+
* <dt><b>Backpressure:</b></dt>
7362+
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
7363+
* behavior.</dd>
7364+
* <dt><b>Scheduler:</b></dt>
7365+
* <dd>{@code doAfterNext} does not operate by default on a particular {@link Scheduler}.</dd>
7366+
* <td><b>Operator-fusion:</b></dt>
7367+
* <dd>This operator supports normal and conditional Subscribers as well as boundary-limited
7368+
* synchronous or asynchronous queue-fusion.</dd>
7369+
* </dl>
7370+
* @param onAfterNext the Consumer that will be called after emitting an item from upstream to the downstream
7371+
* @return the new Flowable instance
7372+
* @since 2.0.1 - experimental
7373+
*/
7374+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
7375+
@SchedulerSupport(SchedulerSupport.NONE)
7376+
@Experimental
7377+
public final Flowable<T> doAfterNext(Consumer<? super T> onAfterNext) {
7378+
ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null");
7379+
return RxJavaPlugins.onAssembly(new FlowableDoAfterNext<T>(this, onAfterNext));
7380+
}
7381+
73567382
/**
73577383
* Registers an {@link Action} to be called when this Publisher invokes either
73587384
* {@link Subscriber#onComplete onComplete} or {@link Subscriber#onError onError}.
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.annotations.Experimental;
19+
import io.reactivex.functions.Consumer;
20+
import io.reactivex.internal.fuseable.ConditionalSubscriber;
21+
import io.reactivex.internal.subscribers.*;
22+
23+
/**
24+
* Calls a consumer after pushing the current item to the downstream.
25+
* @param <T> the value type
26+
* @since 2.0.1 - experimental
27+
*/
28+
@Experimental
29+
public final class FlowableDoAfterNext<T> extends AbstractFlowableWithUpstream<T, T> {
30+
31+
final Consumer<? super T> onAfterNext;
32+
33+
public FlowableDoAfterNext(Publisher<T> source, Consumer<? super T> onAfterNext) {
34+
super(source);
35+
this.onAfterNext = onAfterNext;
36+
}
37+
38+
@Override
39+
protected void subscribeActual(Subscriber<? super T> s) {
40+
if (s instanceof ConditionalSubscriber) {
41+
source.subscribe(new DoAfterConditionalSubscriber<T>((ConditionalSubscriber<? super T>)s, onAfterNext));
42+
} else {
43+
source.subscribe(new DoAfterSubscriber<T>(s, onAfterNext));
44+
}
45+
}
46+
47+
static final class DoAfterSubscriber<T> extends BasicFuseableSubscriber<T, T> {
48+
49+
final Consumer<? super T> onAfterNext;
50+
51+
DoAfterSubscriber(Subscriber<? super T> actual, Consumer<? super T> onAfterNext) {
52+
super(actual);
53+
this.onAfterNext = onAfterNext;
54+
}
55+
56+
@Override
57+
public void onNext(T t) {
58+
actual.onNext(t);
59+
60+
if (sourceMode == NONE) {
61+
try {
62+
onAfterNext.accept(t);
63+
} catch (Throwable ex) {
64+
fail(ex);
65+
}
66+
}
67+
}
68+
69+
@Override
70+
public int requestFusion(int mode) {
71+
return transitiveBoundaryFusion(mode);
72+
}
73+
74+
@Override
75+
public T poll() throws Exception {
76+
T v = qs.poll();
77+
if (v != null) {
78+
onAfterNext.accept(v);
79+
}
80+
return v;
81+
}
82+
}
83+
84+
static final class DoAfterConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
85+
86+
final Consumer<? super T> onAfterNext;
87+
88+
DoAfterConditionalSubscriber(ConditionalSubscriber<? super T> actual, Consumer<? super T> onAfterNext) {
89+
super(actual);
90+
this.onAfterNext = onAfterNext;
91+
}
92+
93+
@Override
94+
public void onNext(T t) {
95+
actual.onNext(t);
96+
97+
if (sourceMode == NONE) {
98+
try {
99+
onAfterNext.accept(t);
100+
} catch (Throwable ex) {
101+
fail(ex);
102+
}
103+
}
104+
}
105+
106+
@Override
107+
public boolean tryOnNext(T t) {
108+
boolean b = actual.tryOnNext(t);
109+
try {
110+
onAfterNext.accept(t);
111+
} catch (Throwable ex) {
112+
fail(ex);
113+
}
114+
return b;
115+
}
116+
117+
@Override
118+
public int requestFusion(int mode) {
119+
return transitiveBoundaryFusion(mode);
120+
}
121+
122+
@Override
123+
public T poll() throws Exception {
124+
T v = qs.poll();
125+
if (v != null) {
126+
onAfterNext.accept(v);
127+
}
128+
return v;
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)