Skip to content

2.x: Flowable.reduce() to return Single, macro fusion #4484

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

import org.reactivestreams.*;

import io.reactivex.Observable;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.flowables.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.functions.*;
import io.reactivex.internal.fuseable.ScalarCallable;
import io.reactivex.internal.operators.completable.CompletableFromPublisher;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.maybe.MaybeFromPublisher;
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.internal.operators.single.SingleFromPublisher;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.subscribers.flowable.*;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -9729,6 +9727,8 @@ public final Flowable<T> rebatchRequests(int n) {
* Publisher into the same function, and so on until all items have been emitted by the source Publisher,
* and emits the final result from the final call to your function as its sole item.
* <p>
* If the source is empty, a {@code NoSuchElementException} is signalled.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduce.png" alt="">
* <p>
* This technique, which is called "reduce" here, is sometimes called "aggregate," "fold," "accumulate,"
Expand All @@ -9745,17 +9745,17 @@ public final Flowable<T> rebatchRequests(int n) {
* @param reducer
* an accumulator function to be invoked on each item emitted by the source Publisher, whose
* result will be used in the next accumulator call
* @return a Flowable that emits a single item that is the result of accumulating the items emitted by
* the source Publisher
* @throws IllegalArgumentException
* if the source Publisher emits no items
* @return a Single that emits a single item that is the result of accumulating the items emitted by
* the source Flowable
* @see <a href="http://reactivex.io/documentation/operators/reduce.html">ReactiveX operators documentation: Reduce</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> reduce(BiFunction<T, T, T> reducer) {
return scan(reducer).last();
public final Single<T> reduce(BiFunction<T, T, T> reducer) {
ObjectHelper.requireNonNull(reducer, "reducer is null");
// return RxJavaPlugins.onAssembly(new FlowableReduce<T>(this, reducer));
return RxJavaPlugins.onAssembly(new SingleReduceFlowable<T>(this, reducer));
}

/**
Expand Down
1,396 changes: 698 additions & 698 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.*;
import io.reactivex.internal.fuseable.FuseToFlowable;
import io.reactivex.internal.operators.completable.*;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.maybe.*;
Expand Down Expand Up @@ -2474,7 +2475,11 @@ public final Completable toCompletable() {
*
* @return an {@link Flowable} that emits a single item T or an error.
*/
@SuppressWarnings("unchecked")
public final Flowable<T> toFlowable() {
if (this instanceof FuseToFlowable) {
return ((FuseToFlowable<T>)this).fuseToFlowable();
}
return RxJavaPlugins.onAssembly(new SingleToFlowable<T>(this));
}

Expand Down
42 changes: 42 additions & 0 deletions src/main/java/io/reactivex/internal/fuseable/FuseToFlowable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.fuseable;

import io.reactivex.Flowable;

/**
* Interface indicating a operator implementation can be macro-fused back to Flowable in case
* the operator goes from Flowable to some other reactive type and then the sequence calls
* for toFlowable again:
* <pre>
* Single&lt;Integer> single = Flowable.range(1, 10).reduce((a, b) -> a + b);
* Flowable&lt;Integer> flowable = single.toFlowable();
* </pre>
*
* The {@code Single.toFlowable()} will check for this interface and call the {@link #fuseToFlowable()}
* to return a Flowable which could be the Flowable-specific implementation of reduce(BiFunction).
* <p>
* This causes a slight overhead in assembly time (1 instanceof check, 1 operator allocation and 1 dropped
* operator) but does not incur the conversion overhead at runtime.
*
* @param <T> the value type
*/
public interface FuseToFlowable<T> {

/**
* Returns a (direct) Flowable for the operator.
* @return the Flowable instance
*/
Flowable<T> fuseToFlowable();
}
42 changes: 42 additions & 0 deletions src/main/java/io/reactivex/internal/fuseable/FuseToObservable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.fuseable;

import io.reactivex.Observable;

/**
* Interface indicating a operator implementation can be macro-fused back to Observable in case
* the operator goes from Observable to some other reactive type and then the sequence calls
* for toObservable again:
* <pre>
* Single&lt;Integer> single = Observable.range(1, 10).reduce((a, b) -> a + b);
* Observable&lt;Integer> observable = single.toObservable();
* </pre>
*
* The {@code Single.toObservable()} will check for this interface and call the {@link #fuseToObservable()}
* to return an Observable which could be the Observable-specific implementation of reduce(BiFunction).
* <p>
* This causes a slight overhead in assembly time (1 instanceof check, 1 operator allocation and 1 dropped
* operator) but does not incur the conversion overhead at runtime.
*
* @param <T> the value type
*/
public interface FuseToObservable<T> {

/**
* Returns a (direct) Observable for the operator.
* @return the Observable instance
*/
Observable<T> fuseToObservable();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.flowable;

import java.util.NoSuchElementException;

import org.reactivestreams.*;

import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Reduces a sequence via a function into a single value or signals NoSuchElementException for
* an empty source.
*
* @param <T> the value type
*/
public final class FlowableReduce<T> extends AbstractFlowableWithUpstream<T, T> {

final BiFunction<T, T, T> reducer;

public FlowableReduce(Publisher<T> source, BiFunction<T, T, T> reducer) {
super(source);
this.reducer = reducer;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new ReduceSubscriber<T>(s, reducer));
}

static final class ReduceSubscriber<T> extends DeferredScalarSubscription<T> implements Subscriber<T> {
/** */
private static final long serialVersionUID = -4663883003264602070L;

final BiFunction<T, T, T> reducer;

Subscription s;

public ReduceSubscriber(Subscriber<? super T> actual, BiFunction<T, T, T> reducer) {
super(actual);
this.reducer = reducer;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);

s.request(Long.MAX_VALUE);
}
}

@Override
public void onNext(T t) {
if (s == SubscriptionHelper.CANCELLED) {
return;
}

T v = value;
if (v == null) {
value = t;
} else {
try {
value = ObjectHelper.requireNonNull(reducer.apply(v, t), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
onError(ex);
}
}
}

@Override
public void onError(Throwable t) {
if (s == SubscriptionHelper.CANCELLED) {
RxJavaPlugins.onError(t);
return;
}
s = SubscriptionHelper.CANCELLED;
actual.onError(t);
}

@Override
public void onComplete() {
if (s == SubscriptionHelper.CANCELLED) {
return;
}
s = SubscriptionHelper.CANCELLED;

T v = value;
if (v != null) {
complete(v);
} else {
actual.onError(new NoSuchElementException());
}
}

@Override
public void cancel() {
super.cancel();
s.cancel();
s = SubscriptionHelper.CANCELLED;
}

}
}
Loading