Skip to content

Commit 928e437

Browse files
authored
2.x: Flowable.reduce() to return Single, macro fusion (#4484)
* 2.x: Flowable.reduce() to return Single, macro fusion * Fix javadoc copy-paste not updated to mention Observable * Fix "a Observable"s
1 parent b6c13f7 commit 928e437

File tree

14 files changed

+1191
-718
lines changed

14 files changed

+1191
-718
lines changed

src/main/java/io/reactivex/Flowable.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,18 @@
1717

1818
import org.reactivestreams.*;
1919

20-
import io.reactivex.Observable;
2120
import io.reactivex.annotations.*;
2221
import io.reactivex.disposables.Disposable;
2322
import io.reactivex.exceptions.Exceptions;
2423
import io.reactivex.flowables.*;
2524
import io.reactivex.functions.*;
26-
import io.reactivex.internal.functions.Functions;
27-
import io.reactivex.internal.functions.ObjectHelper;
28-
import io.reactivex.internal.fuseable.*;
25+
import io.reactivex.internal.functions.*;
26+
import io.reactivex.internal.fuseable.ScalarCallable;
2927
import io.reactivex.internal.operators.completable.CompletableFromPublisher;
3028
import io.reactivex.internal.operators.flowable.*;
3129
import io.reactivex.internal.operators.maybe.MaybeFromPublisher;
3230
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
33-
import io.reactivex.internal.operators.single.SingleFromPublisher;
31+
import io.reactivex.internal.operators.single.*;
3432
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3533
import io.reactivex.internal.subscribers.flowable.*;
3634
import io.reactivex.internal.util.*;
@@ -9729,6 +9727,8 @@ public final Flowable<T> rebatchRequests(int n) {
97299727
* Publisher into the same function, and so on until all items have been emitted by the source Publisher,
97309728
* and emits the final result from the final call to your function as its sole item.
97319729
* <p>
9730+
* If the source is empty, a {@code NoSuchElementException} is signalled.
9731+
* <p>
97329732
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduce.png" alt="">
97339733
* <p>
97349734
* This technique, which is called "reduce" here, is sometimes called "aggregate," "fold," "accumulate,"
@@ -9745,17 +9745,17 @@ public final Flowable<T> rebatchRequests(int n) {
97459745
* @param reducer
97469746
* an accumulator function to be invoked on each item emitted by the source Publisher, whose
97479747
* result will be used in the next accumulator call
9748-
* @return a Flowable that emits a single item that is the result of accumulating the items emitted by
9749-
* the source Publisher
9750-
* @throws IllegalArgumentException
9751-
* if the source Publisher emits no items
9748+
* @return a Single that emits a single item that is the result of accumulating the items emitted by
9749+
* the source Flowable
97529750
* @see <a href="http://reactivex.io/documentation/operators/reduce.html">ReactiveX operators documentation: Reduce</a>
97539751
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
97549752
*/
97559753
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
97569754
@SchedulerSupport(SchedulerSupport.NONE)
9757-
public final Flowable<T> reduce(BiFunction<T, T, T> reducer) {
9758-
return scan(reducer).last();
9755+
public final Single<T> reduce(BiFunction<T, T, T> reducer) {
9756+
ObjectHelper.requireNonNull(reducer, "reducer is null");
9757+
// return RxJavaPlugins.onAssembly(new FlowableReduce<T>(this, reducer));
9758+
return RxJavaPlugins.onAssembly(new SingleReduceFlowable<T>(this, reducer));
97599759
}
97609760

97619761
/**

src/main/java/io/reactivex/Observable.java

+698-698
Large diffs are not rendered by default.

src/main/java/io/reactivex/Single.java

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivex.exceptions.Exceptions;
2323
import io.reactivex.functions.*;
2424
import io.reactivex.internal.functions.*;
25+
import io.reactivex.internal.fuseable.FuseToFlowable;
2526
import io.reactivex.internal.operators.completable.*;
2627
import io.reactivex.internal.operators.flowable.*;
2728
import io.reactivex.internal.operators.maybe.*;
@@ -2474,7 +2475,11 @@ public final Completable toCompletable() {
24742475
*
24752476
* @return an {@link Flowable} that emits a single item T or an error.
24762477
*/
2478+
@SuppressWarnings("unchecked")
24772479
public final Flowable<T> toFlowable() {
2480+
if (this instanceof FuseToFlowable) {
2481+
return ((FuseToFlowable<T>)this).fuseToFlowable();
2482+
}
24782483
return RxJavaPlugins.onAssembly(new SingleToFlowable<T>(this));
24792484
}
24802485

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.fuseable;
15+
16+
import io.reactivex.Flowable;
17+
18+
/**
19+
* Interface indicating a operator implementation can be macro-fused back to Flowable in case
20+
* the operator goes from Flowable to some other reactive type and then the sequence calls
21+
* for toFlowable again:
22+
* <pre>
23+
* Single&lt;Integer> single = Flowable.range(1, 10).reduce((a, b) -> a + b);
24+
* Flowable&lt;Integer> flowable = single.toFlowable();
25+
* </pre>
26+
*
27+
* The {@code Single.toFlowable()} will check for this interface and call the {@link #fuseToFlowable()}
28+
* to return a Flowable which could be the Flowable-specific implementation of reduce(BiFunction).
29+
* <p>
30+
* This causes a slight overhead in assembly time (1 instanceof check, 1 operator allocation and 1 dropped
31+
* operator) but does not incur the conversion overhead at runtime.
32+
*
33+
* @param <T> the value type
34+
*/
35+
public interface FuseToFlowable<T> {
36+
37+
/**
38+
* Returns a (direct) Flowable for the operator.
39+
* @return the Flowable instance
40+
*/
41+
Flowable<T> fuseToFlowable();
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.fuseable;
15+
16+
import io.reactivex.Observable;
17+
18+
/**
19+
* Interface indicating a operator implementation can be macro-fused back to Observable in case
20+
* the operator goes from Observable to some other reactive type and then the sequence calls
21+
* for toObservable again:
22+
* <pre>
23+
* Single&lt;Integer> single = Observable.range(1, 10).reduce((a, b) -> a + b);
24+
* Observable&lt;Integer> observable = single.toObservable();
25+
* </pre>
26+
*
27+
* The {@code Single.toObservable()} will check for this interface and call the {@link #fuseToObservable()}
28+
* to return an Observable which could be the Observable-specific implementation of reduce(BiFunction).
29+
* <p>
30+
* This causes a slight overhead in assembly time (1 instanceof check, 1 operator allocation and 1 dropped
31+
* operator) but does not incur the conversion overhead at runtime.
32+
*
33+
* @param <T> the value type
34+
*/
35+
public interface FuseToObservable<T> {
36+
37+
/**
38+
* Returns a (direct) Observable for the operator.
39+
* @return the Observable instance
40+
*/
41+
Observable<T> fuseToObservable();
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.NoSuchElementException;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.BiFunction;
22+
import io.reactivex.internal.functions.ObjectHelper;
23+
import io.reactivex.internal.subscriptions.*;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
26+
/**
27+
* Reduces a sequence via a function into a single value or signals NoSuchElementException for
28+
* an empty source.
29+
*
30+
* @param <T> the value type
31+
*/
32+
public final class FlowableReduce<T> extends AbstractFlowableWithUpstream<T, T> {
33+
34+
final BiFunction<T, T, T> reducer;
35+
36+
public FlowableReduce(Publisher<T> source, BiFunction<T, T, T> reducer) {
37+
super(source);
38+
this.reducer = reducer;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(Subscriber<? super T> s) {
43+
source.subscribe(new ReduceSubscriber<T>(s, reducer));
44+
}
45+
46+
static final class ReduceSubscriber<T> extends DeferredScalarSubscription<T> implements Subscriber<T> {
47+
/** */
48+
private static final long serialVersionUID = -4663883003264602070L;
49+
50+
final BiFunction<T, T, T> reducer;
51+
52+
Subscription s;
53+
54+
public ReduceSubscriber(Subscriber<? super T> actual, BiFunction<T, T, T> reducer) {
55+
super(actual);
56+
this.reducer = reducer;
57+
}
58+
59+
@Override
60+
public void onSubscribe(Subscription s) {
61+
if (SubscriptionHelper.validate(this.s, s)) {
62+
this.s = s;
63+
64+
actual.onSubscribe(this);
65+
66+
s.request(Long.MAX_VALUE);
67+
}
68+
}
69+
70+
@Override
71+
public void onNext(T t) {
72+
if (s == SubscriptionHelper.CANCELLED) {
73+
return;
74+
}
75+
76+
T v = value;
77+
if (v == null) {
78+
value = t;
79+
} else {
80+
try {
81+
value = ObjectHelper.requireNonNull(reducer.apply(v, t), "The reducer returned a null value");
82+
} catch (Throwable ex) {
83+
Exceptions.throwIfFatal(ex);
84+
s.cancel();
85+
onError(ex);
86+
}
87+
}
88+
}
89+
90+
@Override
91+
public void onError(Throwable t) {
92+
if (s == SubscriptionHelper.CANCELLED) {
93+
RxJavaPlugins.onError(t);
94+
return;
95+
}
96+
s = SubscriptionHelper.CANCELLED;
97+
actual.onError(t);
98+
}
99+
100+
@Override
101+
public void onComplete() {
102+
if (s == SubscriptionHelper.CANCELLED) {
103+
return;
104+
}
105+
s = SubscriptionHelper.CANCELLED;
106+
107+
T v = value;
108+
if (v != null) {
109+
complete(v);
110+
} else {
111+
actual.onError(new NoSuchElementException());
112+
}
113+
}
114+
115+
@Override
116+
public void cancel() {
117+
super.cancel();
118+
s.cancel();
119+
s = SubscriptionHelper.CANCELLED;
120+
}
121+
122+
}
123+
}

0 commit comments

Comments
 (0)