From 14e92d45cb73860be3d8795c27b558dcec17a87e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 9 Nov 2015 15:52:33 +0100 Subject: [PATCH] 1.x: enable operator/source fusion by named operator lifter This change factors out the body of lift() into a named class that gives access to the operator and source parameters. By using this information, other operators can perform what I call **operator macro-fusion**. One such example with this PR is the repeated use of the operator `mergeWith` which when done in the classical way creates a long linked-list of sources merged in pairs, often leading to stack overflows and degraded performance. However, if `mergeWith` can see that it is applied to an existing mergeWith, the two operators can use a common list of sources and then turn into a one-level merge() with n + 1 sources (the previous graph will then be GC'd). Don't worry, this doesn't destroy the original assembled sequence. For example, given `c = a.mergeWith(b); d = c.mergeWith(e);` both c and d can be freely subscribed to and still do the same thing. Note also that this PR conflicts with PR #3477 since the array-based `merge(from(os))` has a different type. --- src/main/java/rx/Observable.java | 56 +++++------ src/main/java/rx/Single.java | 20 ++-- .../operators/OnSubscribeFromIterable.java | 6 ++ .../internal/operators/OnSubscribeLift.java | 99 +++++++++++++++++++ .../operators/OperatorWindowWithSize.java | 4 +- .../java/rx/plugins/RxJavaSchedulersHook.java | 4 +- .../operators/OperatorMergeWithTest.java | 50 ++++++++++ 7 files changed, 195 insertions(+), 44 deletions(-) create mode 100644 src/main/java/rx/internal/operators/OnSubscribeLift.java create mode 100644 src/test/java/rx/internal/operators/OperatorMergeWithTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index cf1686ad83..d027d34a4a 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -145,36 +145,14 @@ public void call(Subscriber subscriber) { *
{@code lift} does not operate by default on a particular {@link Scheduler}.
* * + * @param the value type after the transformation by the operator * @param operator the Operator that implements the Observable-operating function to be applied to the source * Observable * @return an Observable that is the result of applying the lifted Operator to the source Observable * @see RxJava wiki: Implementing Your Own Operators */ public final Observable lift(final Operator operator) { - return new Observable(new OnSubscribe() { - @Override - public void call(Subscriber o) { - try { - Subscriber st = hook.onLift(operator).call(o); - try { - // new Subscriber created and being subscribed with so 'onStart' it - st.onStart(); - onSubscribe.call(st); - } catch (Throwable e) { - // localized capture of errors rather than it skipping all operators - // and ending up in the try/catch of the subscribe method which then - // prevents onErrorResumeNext and other similar approaches to error handling - Exceptions.throwIfFatal(e); - st.onError(e); - } - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - // if the lift function failed all we can do is pass the error to the final Subscriber - // as we don't have the operator available to us - o.onError(e); - } - } - }); + return create(new OnSubscribeLift(this.onSubscribe, operator)); } /** @@ -5813,7 +5791,25 @@ public final Observable> materialize() { * @return an Observable that emits all of the items emitted by the source Observables * @see ReactiveX operators documentation: Merge */ + @SuppressWarnings("unchecked") public final Observable mergeWith(Observable t1) { + if (this.onSubscribe instanceof OnSubscribeLift) { + OnSubscribeLift lifted = (OnSubscribeLift) this.onSubscribe; + if ((lifted.operator() instanceof OperatorMerge) && (lifted.source() instanceof OnSubscribeFromIterable)) { + OnSubscribeFromIterable> iter = (OnSubscribeFromIterable>)lifted.source(); + Iterable> it = iter.iterable(); + if (it instanceof List) { + List> lit = (List>) it; + List> newList = new ArrayList>(lit.size() + 1); + for (Observable t : lit) { + newList.add(t); + } + newList.add(t1); + + return merge(from(newList)); + } + } + } return merge(this, t1); } @@ -8167,7 +8163,7 @@ public final Subscription unsafeSubscribe(Subscriber subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); - // allow the hook to intercept and/or decorate + // allow the HOOK to intercept and/or decorate hook.onSubscribeStart(this, onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { @@ -8181,9 +8177,9 @@ public final Subscription unsafeSubscribe(Subscriber subscriber) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); - // TODO could the hook be the cause of the error in the on error handling. + // TODO could the HOOK be the cause of the error in the on error handling. hook.onSubscribeError(r); - // TODO why aren't we throwing the hook's return value. + // TODO why aren't we throwing the HOOK's return value. throw r; } return Subscriptions.unsubscribed(); @@ -8260,7 +8256,7 @@ private static Subscription subscribe(Subscriber subscriber, Obse // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { - // allow the hook to intercept and/or decorate + // allow the HOOK to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { @@ -8274,9 +8270,9 @@ private static Subscription subscribe(Subscriber subscriber, Obse // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); - // TODO could the hook be the cause of the error in the on error handling. + // TODO could the HOOK be the cause of the error in the on error handling. hook.onSubscribeError(r); - // TODO why aren't we throwing the hook's return value. + // TODO why aren't we throwing the HOOK's return value. throw r; } return Subscriptions.unsubscribed(); diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index e47f9c40c7..6d1f96f0c2 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -144,7 +144,7 @@ private Single(final Observable.OnSubscribe f) { * @see ReactiveX operators documentation: Create */ public final static Single create(OnSubscribe f) { - return new Single(f); // TODO need hook + return new Single(f); // TODO need HOOK } /** @@ -1492,8 +1492,8 @@ public final void unsafeSubscribe(Subscriber subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); - // TODO add back the hook - // hook.onSubscribeStart(this, onSubscribe).call(subscriber); + // TODO add back the HOOK + // HOOK.onSubscribeStart(this, onSubscribe).call(subscriber); onSubscribe.call(subscriber); hook.onSubscribeReturn(subscriber); } catch (Throwable e) { @@ -1507,9 +1507,9 @@ public final void unsafeSubscribe(Subscriber subscriber) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); - // TODO could the hook be the cause of the error in the on error handling. + // TODO could the HOOK be the cause of the error in the on error handling. hook.onSubscribeError(r); - // TODO why aren't we throwing the hook's return value. + // TODO why aren't we throwing the HOOK's return value. throw r; } } @@ -1578,9 +1578,9 @@ public final Subscription subscribe(Subscriber subscriber) { // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. try { - // allow the hook to intercept and/or decorate - // TODO add back the hook - // hook.onSubscribeStart(this, onSubscribe).call(subscriber); + // allow the HOOK to intercept and/or decorate + // TODO add back the HOOK + // HOOK.onSubscribeStart(this, onSubscribe).call(subscriber); onSubscribe.call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { @@ -1594,9 +1594,9 @@ public final Subscription subscribe(Subscriber subscriber) { // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); - // TODO could the hook be the cause of the error in the on error handling. + // TODO could the HOOK be the cause of the error in the on error handling. hook.onSubscribeError(r); - // TODO why aren't we throwing the hook's return value. + // TODO why aren't we throwing the HOOK's return value. throw r; } return Subscriptions.empty(); diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java index f4790e75bd..86c54ba658 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java @@ -28,6 +28,8 @@ *

* You can convert any object that supports the Iterable interface into an Observable that emits each item in * the object, with the {@code toObservable} operation. + * + * @param the value type */ public final class OnSubscribeFromIterable implements OnSubscribe { @@ -39,6 +41,10 @@ public OnSubscribeFromIterable(Iterable iterable) { } this.is = iterable; } + + public Iterable iterable() { + return is; + } @Override public void call(final Subscriber o) { diff --git a/src/main/java/rx/internal/operators/OnSubscribeLift.java b/src/main/java/rx/internal/operators/OnSubscribeLift.java new file mode 100644 index 0000000000..95603eb8ac --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeLift.java @@ -0,0 +1,99 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import rx.*; +import rx.Observable.*; +import rx.exceptions.Exceptions; +import rx.plugins.*; + +/** + * Applies an operator to the incoming child Subscriber and subscribes + * the resulting Subscriber to a source Observable. + *

+ * By turning the original lift from an anonymous class into a named class, + * operator optimizations can now look at the graph and discover the + * operators and sources. + * + * @param the source value type + * @param the result value type; + */ +public final class OnSubscribeLift implements OnSubscribe { + /** The operator. */ + final Operator operator; + /** The upstream. */ + final OnSubscribe source; + /** The callback hook to transform the operator if necessary. */ + static final RxJavaObservableExecutionHook HOOK = + RxJavaPlugins.getInstance().getObservableExecutionHook(); + + /** + * Constructs an OnSubscribeLift instance with the given source and operators. + *

+ * The constructor has to take in an OnSubscribe instead of an Observable, unfortunately, + * because the subscribe/unsafeSubscribe activities would interfere (double onStart, + * double wrapping by hooks, etc). + * @param source the source OnSubscribe + * @param operator the operator to apply on the child subscribers to get a Subscriber for source + */ + public OnSubscribeLift(OnSubscribe source, Operator operator) { + this.operator = operator; + this.source = source; + } + + /** + * Returns the operator instance of this lifting OnSubscribe. + * @return the operator instance of this lifting OnSubscribe + */ + public Operator operator() { + return operator; + } + + /** + * Returns the source OnSubscribe of this OnSubscribe. + * @return the source OnSubscribe of this OnSubscribe + */ + public OnSubscribe source() { + return source; + } + + @Override + public void call(Subscriber child) { + try { + Operator onLift = HOOK.onLift(operator); + + Subscriber st = onLift.call(child); + + try { + // new Subscriber created and being subscribed with so 'onStart' it + st.onStart(); + source.call(st); + } catch (Throwable e) { + // localized capture of errors rather than it skipping all operators + // and ending up in the try/catch of the subscribe method which then + // prevents onErrorResumeNext and other similar approaches to error handling + Exceptions.throwIfFatal(e); + st.onError(e); + } + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + // if the lift function failed all we can do is pass the error to the final Subscriber + // as we don't have the operator available to us + child.onError(e); + } + } +} diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java index e5aae95aaa..21c2c70861 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java @@ -70,7 +70,7 @@ public ExactSubscriber(Subscriber> child) { */ this.child = child; /* - * Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) + * Add unsubscribe HOOK to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) */ } void init() { @@ -156,7 +156,7 @@ public InexactSubscriber(Subscriber> child) { void init() { /* - * Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) + * Add unsubscribe HOOK to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) */ child.add(Subscriptions.create(new Action0() { diff --git a/src/main/java/rx/plugins/RxJavaSchedulersHook.java b/src/main/java/rx/plugins/RxJavaSchedulersHook.java index 3bf923464a..4a3e170528 100644 --- a/src/main/java/rx/plugins/RxJavaSchedulersHook.java +++ b/src/main/java/rx/plugins/RxJavaSchedulersHook.java @@ -25,10 +25,10 @@ * the 3 methods that return Scheduler (io(), computation(), newThread()). * 2. You may wrap/decorate an {@link Action0}, before it is handed off to a Scheduler. The system- * supplied Schedulers (Schedulers.ioScheduler, Schedulers.computationScheduler, - * Scheduler.newThreadScheduler) all use this hook, so it's a convenient way to + * Scheduler.newThreadScheduler) all use this HOOK, so it's a convenient way to * modify Scheduler functionality without redefining Schedulers wholesale. * - * Also, when redefining Schedulers, you are free to use/not use the onSchedule decoration hook. + * Also, when redefining Schedulers, you are free to use/not use the onSchedule decoration HOOK. *

* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: * https://github.com/ReactiveX/RxJava/wiki/Plugins. diff --git a/src/test/java/rx/internal/operators/OperatorMergeWithTest.java b/src/test/java/rx/internal/operators/OperatorMergeWithTest.java new file mode 100644 index 0000000000..171e41f598 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorMergeWithTest.java @@ -0,0 +1,50 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import org.junit.Test; + +import rx.Observable; +import rx.observers.TestSubscriber; + +public class OperatorMergeWithTest { + @Test + public void mergeLargeAmountOfSources() { + Observable source = Observable.range(1, 2); + + Observable result = source; + int n = 5000; + + for (int i = 0; i < n; i++) { + result = result.mergeWith(source); + } + + TestSubscriber ts = TestSubscriber.create(); + + long t = System.nanoTime(); + + result.subscribe(ts); + + ts.assertValueCount((n + 1) * 2); + ts.assertNoErrors(); + ts.assertCompleted(); + + t = System.nanoTime() - t; + + System.out.printf("Merging took: %,d ns%n", t); + } +}