Skip to content

Commit 3d00eb1

Browse files
authored
3.x: [Java 8] Add ParallelFlowable operators + cleanup (#6798)
1 parent af17c6e commit 3d00eb1

32 files changed

+2497
-53
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18944,9 +18944,9 @@ public final TestSubscriber<T> test(long initialRequest, boolean cancel) { // No
1894418944
* @param <T> the element type of the optional value
1894518945
* @param optional the optional value to convert into a {@code Flowable}
1894618946
* @return the new Flowable instance
18947+
* @since 3.0.0
1894718948
* @see #just(Object)
1894818949
* @see #empty()
18949-
* @since 3.0.0
1895018950
*/
1895118951
@CheckReturnValue
1895218952
@BackpressureSupport(BackpressureKind.FULL)
@@ -19409,6 +19409,7 @@ public final Stream<T> blockingStream(int prefetch) {
1940919409
* @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements
1941019410
* will be emitted to the downstream
1941119411
* @return the new Flowable instance
19412+
* @since 3.0.0
1941219413
* @see #concatMap(Function)
1941319414
* @see #concatMapIterable(Function)
1941419415
* @see #concatMapStream(Function, int)
@@ -19461,6 +19462,7 @@ public final Stream<T> blockingStream(int prefetch) {
1946119462
* will be emitted to the downstream
1946219463
* @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received
1946319464
* @return the new Flowable instance
19465+
* @since 3.0.0
1946419466
* @see #concatMap(Function, int)
1946519467
* @see #concatMapIterable(Function, int)
1946619468
* @see #flatMapStream(Function, int)
@@ -19515,6 +19517,7 @@ public final Stream<T> blockingStream(int prefetch) {
1951519517
* @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements
1951619518
* will be emitted to the downstream
1951719519
* @return the new Flowable instance
19520+
* @since 3.0.0
1951819521
* @see #flatMap(Function)
1951919522
* @see #flatMapIterable(Function)
1952019523
* @see #flatMapStream(Function, int)
@@ -19567,6 +19570,7 @@ public final Stream<T> blockingStream(int prefetch) {
1956719570
* will be emitted to the downstream
1956819571
* @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received
1956919572
* @return the new Flowable instance
19573+
* @since 3.0.0
1957019574
* @see #flatMap(Function, int)
1957119575
* @see #flatMapIterable(Function, int)
1957219576
* @see #concatMapStream(Function, int)

src/main/java/io/reactivex/rxjava3/core/Maybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4870,9 +4870,9 @@ public final TestObserver<T> test(boolean dispose) {
48704870
* @param <T> the element type of the optional value
48714871
* @param optional the optional value to convert into a {@code Maybe}
48724872
* @return the new Maybe instance
4873+
* @since 3.0.0
48734874
* @see #just(Object)
48744875
* @see #empty()
4875-
* @since 3.0.0
48764876
*/
48774877
@CheckReturnValue
48784878
@SchedulerSupport(SchedulerSupport.NONE)

src/main/java/io/reactivex/rxjava3/core/Observable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15937,9 +15937,9 @@ public final TestObserver<T> test(boolean dispose) { // NoPMD
1593715937
* @param <T> the element type of the optional value
1593815938
* @param optional the optional value to convert into an {@code Observable}
1593915939
* @return the new Observable instance
15940+
* @since 3.0.0
1594015941
* @see #just(Object)
1594115942
* @see #empty()
15942-
* @since 3.0.0
1594315943
*/
1594415944
@CheckReturnValue
1594515945
@SchedulerSupport(SchedulerSupport.NONE)
@@ -16355,6 +16355,7 @@ public final Stream<T> blockingStream(int capacityHint) {
1635516355
* @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements
1635616356
* will be emitted to the downstream
1635716357
* @return the new Observable instance
16358+
* @since 3.0.0
1635816359
* @see #concatMap(Function)
1635916360
* @see #concatMapIterable(Function)
1636016361
* @see #flatMapStream(Function)
@@ -16401,6 +16402,7 @@ public final Stream<T> blockingStream(int capacityHint) {
1640116402
* @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements
1640216403
* will be emitted to the downstream
1640316404
* @return the new Observable instance
16405+
* @since 3.0.0
1640416406
* @see #flatMap(Function)
1640516407
* @see #flatMapIterable(Function)
1640616408
*/

src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,23 @@ protected void subscribeActual(Subscriber<? super R> s) {
7171
EmptySubscription.complete(s);
7272
}
7373
} else {
74-
source.subscribe(new FlatMapStreamSubscriber<>(s, mapper, prefetch));
74+
source.subscribe(subscribe(s, mapper, prefetch));
7575
}
7676
}
7777

78+
/**
79+
* Create a {@link Subscriber} with the given parameters.
80+
* @param <T> the upstream value type
81+
* @param <R> the {@link Stream} and output value type
82+
* @param downstream the downstream {@code Subscriber} to wrap
83+
* @param mapper the mapper function
84+
* @param prefetch the number of items to prefetch
85+
* @return the new {@code Subscriber}
86+
*/
87+
public static <T, R> Subscriber<T> subscribe(Subscriber<? super R> downstream, Function<? super T, ? extends Stream<? extends R>> mapper, int prefetch) {
88+
return new FlatMapStreamSubscriber<>(downstream, mapper, prefetch);
89+
}
90+
7891
static final class FlatMapStreamSubscriber<T, R> extends AtomicInteger
7992
implements FlowableSubscriber<T>, Subscription {
8093

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.jdk8;
15+
16+
import java.util.Objects;
17+
import java.util.concurrent.atomic.*;
18+
import java.util.function.*;
19+
import java.util.stream.Collector;
20+
21+
import org.reactivestreams.*;
22+
23+
import io.reactivex.rxjava3.core.*;
24+
import io.reactivex.rxjava3.exceptions.Exceptions;
25+
import io.reactivex.rxjava3.internal.subscriptions.*;
26+
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
27+
import io.reactivex.rxjava3.parallel.ParallelFlowable;
28+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
29+
30+
/**
31+
* Reduces all 'rails' into a single via a Java 8 {@link Collector} callback set.
32+
*
33+
* @param <T> the value type
34+
* @param <A> the accumulator type
35+
* @param <R> the result type
36+
* @since 3.0.0
37+
*/
38+
public final class ParallelCollector<T, A, R> extends Flowable<R> {
39+
40+
final ParallelFlowable<? extends T> source;
41+
42+
final Collector<T, A, R> collector;
43+
44+
public ParallelCollector(ParallelFlowable<? extends T> source, Collector<T, A, R> collector) {
45+
this.source = source;
46+
this.collector = collector;
47+
}
48+
49+
@Override
50+
protected void subscribeActual(Subscriber<? super R> s) {
51+
ParallelCollectorSubscriber<T, A, R> parent;
52+
try {
53+
parent = new ParallelCollectorSubscriber<>(s, source.parallelism(), collector);
54+
} catch (Throwable ex) {
55+
Exceptions.throwIfFatal(ex);
56+
EmptySubscription.error(ex, s);
57+
return;
58+
}
59+
s.onSubscribe(parent);
60+
61+
source.subscribe(parent.subscribers);
62+
}
63+
64+
static final class ParallelCollectorSubscriber<T, A, R> extends DeferredScalarSubscription<R> {
65+
66+
private static final long serialVersionUID = -5370107872170712765L;
67+
68+
final ParallelCollectorInnerSubscriber<T, A, R>[] subscribers;
69+
70+
final AtomicReference<SlotPair<A>> current = new AtomicReference<>();
71+
72+
final AtomicInteger remaining = new AtomicInteger();
73+
74+
final AtomicThrowable error = new AtomicThrowable();
75+
76+
final Function<A, R> finisher;
77+
78+
ParallelCollectorSubscriber(Subscriber<? super R> subscriber, int n, Collector<T, A, R> collector) {
79+
super(subscriber);
80+
this.finisher = collector.finisher();
81+
@SuppressWarnings("unchecked")
82+
ParallelCollectorInnerSubscriber<T, A, R>[] a = new ParallelCollectorInnerSubscriber[n];
83+
for (int i = 0; i < n; i++) {
84+
a[i] = new ParallelCollectorInnerSubscriber<>(this, collector.supplier().get(), collector.accumulator(), collector.combiner());
85+
}
86+
this.subscribers = a;
87+
remaining.lazySet(n);
88+
}
89+
90+
SlotPair<A> addValue(A value) {
91+
for (;;) {
92+
SlotPair<A> curr = current.get();
93+
94+
if (curr == null) {
95+
curr = new SlotPair<>();
96+
if (!current.compareAndSet(null, curr)) {
97+
continue;
98+
}
99+
}
100+
101+
int c = curr.tryAcquireSlot();
102+
if (c < 0) {
103+
current.compareAndSet(curr, null);
104+
continue;
105+
}
106+
if (c == 0) {
107+
curr.first = value;
108+
} else {
109+
curr.second = value;
110+
}
111+
112+
if (curr.releaseSlot()) {
113+
current.compareAndSet(curr, null);
114+
return curr;
115+
}
116+
return null;
117+
}
118+
}
119+
120+
@Override
121+
public void cancel() {
122+
for (ParallelCollectorInnerSubscriber<T, A, R> inner : subscribers) {
123+
inner.cancel();
124+
}
125+
}
126+
127+
void innerError(Throwable ex) {
128+
if (error.compareAndSet(null, ex)) {
129+
cancel();
130+
downstream.onError(ex);
131+
} else {
132+
if (ex != error.get()) {
133+
RxJavaPlugins.onError(ex);
134+
}
135+
}
136+
}
137+
138+
void innerComplete(A value, BinaryOperator<A> combiner) {
139+
for (;;) {
140+
SlotPair<A> sp = addValue(value);
141+
142+
if (sp != null) {
143+
144+
try {
145+
value = combiner.apply(sp.first, sp.second);
146+
} catch (Throwable ex) {
147+
Exceptions.throwIfFatal(ex);
148+
innerError(ex);
149+
return;
150+
}
151+
152+
} else {
153+
break;
154+
}
155+
}
156+
157+
if (remaining.decrementAndGet() == 0) {
158+
SlotPair<A> sp = current.get();
159+
current.lazySet(null);
160+
161+
R result;
162+
try {
163+
result = Objects.requireNonNull(finisher.apply(sp.first), "The finisher returned a null value");
164+
} catch (Throwable ex) {
165+
Exceptions.throwIfFatal(ex);
166+
innerError(ex);
167+
return;
168+
}
169+
170+
complete(result);
171+
}
172+
}
173+
}
174+
175+
static final class ParallelCollectorInnerSubscriber<T, A, R>
176+
extends AtomicReference<Subscription>
177+
implements FlowableSubscriber<T> {
178+
179+
private static final long serialVersionUID = -7954444275102466525L;
180+
181+
final ParallelCollectorSubscriber<T, A, R> parent;
182+
183+
final BiConsumer<A, T> accumulator;
184+
185+
final BinaryOperator<A> combiner;
186+
187+
A container;
188+
189+
boolean done;
190+
191+
ParallelCollectorInnerSubscriber(ParallelCollectorSubscriber<T, A, R> parent, A container, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner) {
192+
this.parent = parent;
193+
this.accumulator = accumulator;
194+
this.combiner = combiner;
195+
this.container = container;
196+
}
197+
198+
@Override
199+
public void onSubscribe(Subscription s) {
200+
SubscriptionHelper.setOnce(this, s, Long.MAX_VALUE);
201+
}
202+
203+
@Override
204+
public void onNext(T t) {
205+
if (!done) {
206+
try {
207+
accumulator.accept(container, t);
208+
} catch (Throwable ex) {
209+
Exceptions.throwIfFatal(ex);
210+
get().cancel();
211+
onError(ex);
212+
return;
213+
}
214+
}
215+
}
216+
217+
@Override
218+
public void onError(Throwable t) {
219+
if (done) {
220+
RxJavaPlugins.onError(t);
221+
return;
222+
}
223+
container = null;
224+
done = true;
225+
parent.innerError(t);
226+
}
227+
228+
@Override
229+
public void onComplete() {
230+
if (!done) {
231+
A v = container;
232+
container = null;
233+
done = true;
234+
parent.innerComplete(v, combiner);
235+
}
236+
}
237+
238+
void cancel() {
239+
SubscriptionHelper.cancel(this);
240+
}
241+
}
242+
243+
static final class SlotPair<T> extends AtomicInteger {
244+
245+
private static final long serialVersionUID = 473971317683868662L;
246+
247+
T first;
248+
249+
T second;
250+
251+
final AtomicInteger releaseIndex = new AtomicInteger();
252+
253+
int tryAcquireSlot() {
254+
for (;;) {
255+
int acquired = get();
256+
if (acquired >= 2) {
257+
return -1;
258+
}
259+
260+
if (compareAndSet(acquired, acquired + 1)) {
261+
return acquired;
262+
}
263+
}
264+
}
265+
266+
boolean releaseSlot() {
267+
return releaseIndex.incrementAndGet() == 2;
268+
}
269+
}
270+
}

0 commit comments

Comments
 (0)