Skip to content

Commit 14e92d4

Browse files
committed
1.x: enable operator/source fusion by named operator lifter
This change factors out the body of lift() into a named class that gives access to the operator and source parameters. By using this information, other operators can perform what I call **operator macro-fusion**. One such example with this PR is the repeated use of the operator `mergeWith` which when done in the classical way creates a long linked-list of sources merged in pairs, often leading to stack overflows and degraded performance. However, if `mergeWith` can see that it is applied to an existing mergeWith, the two operators can use a common list of sources and then turn into a one-level merge() with n + 1 sources (the previous graph will then be GC'd). Don't worry, this doesn't destroy the original assembled sequence. For example, given `c = a.mergeWith(b); d = c.mergeWith(e);` both c and d can be freely subscribed to and still do the same thing. Note also that this PR conflicts with PR ReactiveX#3477 since the array-based `merge(from(os))` has a different type.
1 parent 3e2b3b1 commit 14e92d4

File tree

7 files changed

+195
-44
lines changed

7 files changed

+195
-44
lines changed

src/main/java/rx/Observable.java

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -145,36 +145,14 @@ public void call(Subscriber<? super T> subscriber) {
145145
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
146146
* </dl>
147147
*
148+
* @param <R> the value type after the transformation by the operator
148149
* @param operator the Operator that implements the Observable-operating function to be applied to the source
149150
* Observable
150151
* @return an Observable that is the result of applying the lifted Operator to the source Observable
151152
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
152153
*/
153154
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
154-
return new Observable<R>(new OnSubscribe<R>() {
155-
@Override
156-
public void call(Subscriber<? super R> o) {
157-
try {
158-
Subscriber<? super T> st = hook.onLift(operator).call(o);
159-
try {
160-
// new Subscriber created and being subscribed with so 'onStart' it
161-
st.onStart();
162-
onSubscribe.call(st);
163-
} catch (Throwable e) {
164-
// localized capture of errors rather than it skipping all operators
165-
// and ending up in the try/catch of the subscribe method which then
166-
// prevents onErrorResumeNext and other similar approaches to error handling
167-
Exceptions.throwIfFatal(e);
168-
st.onError(e);
169-
}
170-
} catch (Throwable e) {
171-
Exceptions.throwIfFatal(e);
172-
// if the lift function failed all we can do is pass the error to the final Subscriber
173-
// as we don't have the operator available to us
174-
o.onError(e);
175-
}
176-
}
177-
});
155+
return create(new OnSubscribeLift<T, R>(this.onSubscribe, operator));
178156
}
179157

180158
/**
@@ -5813,7 +5791,25 @@ public final Observable<Notification<T>> materialize() {
58135791
* @return an Observable that emits all of the items emitted by the source Observables
58145792
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
58155793
*/
5794+
@SuppressWarnings("unchecked")
58165795
public final Observable<T> mergeWith(Observable<? extends T> t1) {
5796+
if (this.onSubscribe instanceof OnSubscribeLift) {
5797+
OnSubscribeLift lifted = (OnSubscribeLift) this.onSubscribe;
5798+
if ((lifted.operator() instanceof OperatorMerge) && (lifted.source() instanceof OnSubscribeFromIterable)) {
5799+
OnSubscribeFromIterable<Observable<? extends T>> iter = (OnSubscribeFromIterable<Observable<? extends T>>)lifted.source();
5800+
Iterable<? extends Observable<? extends T>> it = iter.iterable();
5801+
if (it instanceof List) {
5802+
List<? extends Observable<? extends T>> lit = (List<? extends Observable<? extends T>>) it;
5803+
List<Observable<? extends T>> newList = new ArrayList<Observable<? extends T>>(lit.size() + 1);
5804+
for (Observable<? extends T> t : lit) {
5805+
newList.add(t);
5806+
}
5807+
newList.add(t1);
5808+
5809+
return merge(from(newList));
5810+
}
5811+
}
5812+
}
58175813
return merge(this, t1);
58185814
}
58195815

@@ -8167,7 +8163,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
81678163
try {
81688164
// new Subscriber so onStart it
81698165
subscriber.onStart();
8170-
// allow the hook to intercept and/or decorate
8166+
// allow the HOOK to intercept and/or decorate
81718167
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
81728168
return hook.onSubscribeReturn(subscriber);
81738169
} catch (Throwable e) {
@@ -8181,9 +8177,9 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
81818177
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
81828178
// so we are unable to propagate the error correctly and will just throw
81838179
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
8184-
// TODO could the hook be the cause of the error in the on error handling.
8180+
// TODO could the HOOK be the cause of the error in the on error handling.
81858181
hook.onSubscribeError(r);
8186-
// TODO why aren't we throwing the hook's return value.
8182+
// TODO why aren't we throwing the HOOK's return value.
81878183
throw r;
81888184
}
81898185
return Subscriptions.unsubscribed();
@@ -8260,7 +8256,7 @@ private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Obse
82608256
// The code below is exactly the same an unsafeSubscribe but not used because it would
82618257
// add a significant depth to already huge call stacks.
82628258
try {
8263-
// allow the hook to intercept and/or decorate
8259+
// allow the HOOK to intercept and/or decorate
82648260
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
82658261
return hook.onSubscribeReturn(subscriber);
82668262
} catch (Throwable e) {
@@ -8274,9 +8270,9 @@ private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Obse
82748270
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
82758271
// so we are unable to propagate the error correctly and will just throw
82768272
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
8277-
// TODO could the hook be the cause of the error in the on error handling.
8273+
// TODO could the HOOK be the cause of the error in the on error handling.
82788274
hook.onSubscribeError(r);
8279-
// TODO why aren't we throwing the hook's return value.
8275+
// TODO why aren't we throwing the HOOK's return value.
82808276
throw r;
82818277
}
82828278
return Subscriptions.unsubscribed();

src/main/java/rx/Single.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private Single(final Observable.OnSubscribe<T> f) {
144144
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
145145
*/
146146
public final static <T> Single<T> create(OnSubscribe<T> f) {
147-
return new Single<T>(f); // TODO need hook
147+
return new Single<T>(f); // TODO need HOOK
148148
}
149149

150150
/**
@@ -1492,8 +1492,8 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
14921492
try {
14931493
// new Subscriber so onStart it
14941494
subscriber.onStart();
1495-
// TODO add back the hook
1496-
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
1495+
// TODO add back the HOOK
1496+
// HOOK.onSubscribeStart(this, onSubscribe).call(subscriber);
14971497
onSubscribe.call(subscriber);
14981498
hook.onSubscribeReturn(subscriber);
14991499
} catch (Throwable e) {
@@ -1507,9 +1507,9 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
15071507
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
15081508
// so we are unable to propagate the error correctly and will just throw
15091509
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
1510-
// TODO could the hook be the cause of the error in the on error handling.
1510+
// TODO could the HOOK be the cause of the error in the on error handling.
15111511
hook.onSubscribeError(r);
1512-
// TODO why aren't we throwing the hook's return value.
1512+
// TODO why aren't we throwing the HOOK's return value.
15131513
throw r;
15141514
}
15151515
}
@@ -1578,9 +1578,9 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
15781578

15791579
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
15801580
try {
1581-
// allow the hook to intercept and/or decorate
1582-
// TODO add back the hook
1583-
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
1581+
// allow the HOOK to intercept and/or decorate
1582+
// TODO add back the HOOK
1583+
// HOOK.onSubscribeStart(this, onSubscribe).call(subscriber);
15841584
onSubscribe.call(subscriber);
15851585
return hook.onSubscribeReturn(subscriber);
15861586
} catch (Throwable e) {
@@ -1594,9 +1594,9 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
15941594
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
15951595
// so we are unable to propagate the error correctly and will just throw
15961596
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
1597-
// TODO could the hook be the cause of the error in the on error handling.
1597+
// TODO could the HOOK be the cause of the error in the on error handling.
15981598
hook.onSubscribeError(r);
1599-
// TODO why aren't we throwing the hook's return value.
1599+
// TODO why aren't we throwing the HOOK's return value.
16001600
throw r;
16011601
}
16021602
return Subscriptions.empty();

src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
* <p>
2929
* You can convert any object that supports the Iterable interface into an Observable that emits each item in
3030
* the object, with the {@code toObservable} operation.
31+
*
32+
* @param <T> the value type
3133
*/
3234
public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {
3335

@@ -39,6 +41,10 @@ public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
3941
}
4042
this.is = iterable;
4143
}
44+
45+
public Iterable<? extends T> iterable() {
46+
return is;
47+
}
4248

4349
@Override
4450
public void call(final Subscriber<? super T> o) {
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.*;
20+
import rx.Observable.*;
21+
import rx.exceptions.Exceptions;
22+
import rx.plugins.*;
23+
24+
/**
25+
* Applies an operator to the incoming child Subscriber and subscribes
26+
* the resulting Subscriber to a source Observable.
27+
* <p>
28+
* By turning the original lift from an anonymous class into a named class,
29+
* operator optimizations can now look at the graph and discover the
30+
* operators and sources.
31+
*
32+
* @param <T> the source value type
33+
* @param <R> the result value type;
34+
*/
35+
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
36+
/** The operator. */
37+
final Operator<? extends R, ? super T> operator;
38+
/** The upstream. */
39+
final OnSubscribe<? extends T> source;
40+
/** The callback hook to transform the operator if necessary. */
41+
static final RxJavaObservableExecutionHook HOOK =
42+
RxJavaPlugins.getInstance().getObservableExecutionHook();
43+
44+
/**
45+
* Constructs an OnSubscribeLift instance with the given source and operators.
46+
* <p>
47+
* The constructor has to take in an OnSubscribe instead of an Observable, unfortunately,
48+
* because the subscribe/unsafeSubscribe activities would interfere (double onStart,
49+
* double wrapping by hooks, etc).
50+
* @param source the source OnSubscribe
51+
* @param operator the operator to apply on the child subscribers to get a Subscriber for source
52+
*/
53+
public OnSubscribeLift(OnSubscribe<? extends T> source, Operator<? extends R, ? super T> operator) {
54+
this.operator = operator;
55+
this.source = source;
56+
}
57+
58+
/**
59+
* Returns the operator instance of this lifting OnSubscribe.
60+
* @return the operator instance of this lifting OnSubscribe
61+
*/
62+
public Operator<? extends R, ? super T> operator() {
63+
return operator;
64+
}
65+
66+
/**
67+
* Returns the source OnSubscribe of this OnSubscribe.
68+
* @return the source OnSubscribe of this OnSubscribe
69+
*/
70+
public OnSubscribe<? extends T> source() {
71+
return source;
72+
}
73+
74+
@Override
75+
public void call(Subscriber<? super R> child) {
76+
try {
77+
Operator<? extends R, ? super T> onLift = HOOK.onLift(operator);
78+
79+
Subscriber<? super T> st = onLift.call(child);
80+
81+
try {
82+
// new Subscriber created and being subscribed with so 'onStart' it
83+
st.onStart();
84+
source.call(st);
85+
} catch (Throwable e) {
86+
// localized capture of errors rather than it skipping all operators
87+
// and ending up in the try/catch of the subscribe method which then
88+
// prevents onErrorResumeNext and other similar approaches to error handling
89+
Exceptions.throwIfFatal(e);
90+
st.onError(e);
91+
}
92+
} catch (Throwable e) {
93+
Exceptions.throwIfFatal(e);
94+
// if the lift function failed all we can do is pass the error to the final Subscriber
95+
// as we don't have the operator available to us
96+
child.onError(e);
97+
}
98+
}
99+
}

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
7070
*/
7171
this.child = child;
7272
/*
73-
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
73+
* Add unsubscribe HOOK to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
7474
*/
7575
}
7676
void init() {
@@ -156,7 +156,7 @@ public InexactSubscriber(Subscriber<? super Observable<T>> child) {
156156

157157
void init() {
158158
/*
159-
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
159+
* Add unsubscribe HOOK to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
160160
*/
161161
child.add(Subscriptions.create(new Action0() {
162162

src/main/java/rx/plugins/RxJavaSchedulersHook.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
* the 3 methods that return Scheduler (io(), computation(), newThread()).
2626
* 2. You may wrap/decorate an {@link Action0}, before it is handed off to a Scheduler. The system-
2727
* supplied Schedulers (Schedulers.ioScheduler, Schedulers.computationScheduler,
28-
* Scheduler.newThreadScheduler) all use this hook, so it's a convenient way to
28+
* Scheduler.newThreadScheduler) all use this HOOK, so it's a convenient way to
2929
* modify Scheduler functionality without redefining Schedulers wholesale.
3030
*
31-
* Also, when redefining Schedulers, you are free to use/not use the onSchedule decoration hook.
31+
* Also, when redefining Schedulers, you are free to use/not use the onSchedule decoration HOOK.
3232
* <p>
3333
* See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins:
3434
* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import org.junit.Test;
20+
21+
import rx.Observable;
22+
import rx.observers.TestSubscriber;
23+
24+
public class OperatorMergeWithTest {
25+
@Test
26+
public void mergeLargeAmountOfSources() {
27+
Observable<Integer> source = Observable.range(1, 2);
28+
29+
Observable<Integer> result = source;
30+
int n = 5000;
31+
32+
for (int i = 0; i < n; i++) {
33+
result = result.mergeWith(source);
34+
}
35+
36+
TestSubscriber<Integer> ts = TestSubscriber.create();
37+
38+
long t = System.nanoTime();
39+
40+
result.subscribe(ts);
41+
42+
ts.assertValueCount((n + 1) * 2);
43+
ts.assertNoErrors();
44+
ts.assertCompleted();
45+
46+
t = System.nanoTime() - t;
47+
48+
System.out.printf("Merging took: %,d ns%n", t);
49+
}
50+
}

0 commit comments

Comments
 (0)