Skip to content

Commit 02ccc4d

Browse files
New Bind Signature and GroupBy Operator
- Changed `bind` signature to match the variant discussed at ReactiveX#746 (comment) - Updated code to new signature. - Re-implemented GroupBy operator with `bind`
1 parent 723d935 commit 02ccc4d

28 files changed

+530
-544
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 42 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
import rx.operators.OperationFilter;
5555
import rx.operators.OperationFinally;
5656
import rx.operators.OperationFlatMap;
57-
import rx.operators.OperationGroupBy;
57+
import rx.operators.OperatorGroupBy;
5858
import rx.operators.OperationGroupByUntil;
5959
import rx.operators.OperationGroupJoin;
6060
import rx.operators.OperationInterval;
@@ -159,7 +159,7 @@
159159
*/
160160
public class Observable<T> {
161161

162-
final Action2<Observer<? super T>, OperatorSubscription> f;
162+
final Action1<Operator<? super T>> f;
163163

164164
/**
165165
* Observable with Function to execute when subscribed to.
@@ -171,7 +171,7 @@ public class Observable<T> {
171171
* @param onSubscribe
172172
* {@link OnSubscribeFunc} to be executed when {@link #subscribe(Observer)} is called
173173
*/
174-
protected Observable(Action2<Observer<? super T>, OperatorSubscription> f) {
174+
protected Observable(Action1<Operator<? super T>> f) {
175175
this.f = f;
176176
}
177177

@@ -189,31 +189,6 @@ public static interface OnSubscribeFunc<T> extends Function {
189189
public Subscription onSubscribe(Observer<? super T> t1);
190190
}
191191

192-
public static class OperatorSubscription implements Subscription {
193-
194-
private final CompositeSubscription cs = new CompositeSubscription();
195-
196-
@Override
197-
public void unsubscribe() {
198-
cs.unsubscribe();
199-
}
200-
201-
public static OperatorSubscription create(Subscription s) {
202-
OperatorSubscription _s = new OperatorSubscription();
203-
_s.add(s);
204-
return _s;
205-
}
206-
207-
public boolean isUnsubscribed() {
208-
return cs.isUnsubscribed();
209-
}
210-
211-
public void add(Subscription s) {
212-
cs.add(s);
213-
}
214-
215-
}
216-
217192
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
218193

219194
/**
@@ -260,27 +235,27 @@ protected Observable(Action2<Observer<? super T>, OperatorSubscription> f) {
260235
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#create">RxJava Wiki: create()</a>
261236
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.create.aspx">MSDN: Observable.Create</a>
262237
*/
263-
public final static <T> Observable<T> create(final Action2<Observer<? super T>, OperatorSubscription> f) {
238+
public final static <T> Observable<T> create(final Action1<Operator<? super T>> f) {
264239
return new Observable<T>(f);
265240
}
266241

267242
public final static <T> Observable<T> create(final OnSubscribeFunc<T> func) {
268-
return new Observable<T>(new Action2<Observer<? super T>, OperatorSubscription>() {
243+
return new Observable<T>(new Action1<Operator<? super T>>() {
269244

270245
@Override
271-
public void call(Observer<? super T> o, OperatorSubscription s) {
272-
s.add(func.onSubscribe(o));
246+
public void call(Operator<? super T> o) {
247+
o.add(func.onSubscribe(o));
273248
}
274249

275250
});
276251
}
277252

278-
public <R> Observable<R> bind(final Func2<Observer<? super R>, OperatorSubscription, Observer<? super T>> bind) {
279-
return new Observable<R>(new Action2<Observer<? super R>, OperatorSubscription>() {
253+
public <R> Observable<R> bind(final Func1<Operator<? super R>, Operator<? super T>> bind) {
254+
return new Observable<R>(new Action1<Operator<? super R>>() {
280255

281256
@Override
282-
public void call(Observer<? super R> o, OperatorSubscription s) {
283-
f.call(bind.call(o, s), s);
257+
public void call(Operator<? super R> o) {
258+
subscribe(bind.call(o));
284259
}
285260
});
286261
}
@@ -2878,7 +2853,7 @@ public final static <T> Observable<T> synchronize(Observable<T> source) {
28782853
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229435.aspx">MSDN: Observable.Timer</a>
28792854
*/
28802855
public final static Observable<Long> timer(long initialDelay, long period, TimeUnit unit) {
2881-
return timer(initialDelay, period, unit, Schedulers.threadPoolForComputation());
2856+
return timer(initialDelay, period, unit, Schedulers.computation());
28822857
}
28832858

28842859
/**
@@ -2917,7 +2892,7 @@ public final static Observable<Long> timer(long initialDelay, long period, TimeU
29172892
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#timer">RxJava wiki: timer()</a>
29182893
*/
29192894
public final static Observable<Long> timer(long delay, TimeUnit unit) {
2920-
return timer(delay, unit, Schedulers.threadPoolForComputation());
2895+
return timer(delay, unit, Schedulers.computation());
29212896
}
29222897

29232898
/**
@@ -4299,7 +4274,7 @@ public final <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> i
42994274
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229810.aspx">MSDN: Observable.Delay</a>
43004275
*/
43014276
public final Observable<T> delay(long delay, TimeUnit unit) {
4302-
return OperationDelay.delay(this, delay, unit, Schedulers.threadPoolForComputation());
4277+
return OperationDelay.delay(this, delay, unit, Schedulers.computation());
43034278
}
43044279

43054280
/**
@@ -4336,7 +4311,7 @@ public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
43364311
* amount
43374312
*/
43384313
public final Observable<T> delaySubscription(long delay, TimeUnit unit) {
4339-
return delaySubscription(delay, unit, Schedulers.threadPoolForComputation());
4314+
return delaySubscription(delay, unit, Schedulers.computation());
43404315
}
43414316

43424317
/**
@@ -4795,7 +4770,7 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
47954770
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
47964771
*/
47974772
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
4798-
return create(OperationGroupBy.groupBy(this, keySelector));
4773+
return bind(new OperatorGroupBy<K, T>(keySelector));
47994774
}
48004775

48014776
/**
@@ -4819,7 +4794,7 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super
48194794
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
48204795
*/
48214796
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
4822-
return create(OperationGroupBy.groupBy(this, keySelector, elementSelector));
4797+
return null;
48234798
}
48244799

48254800
/**
@@ -5936,7 +5911,7 @@ public final Subject<T, T> call() {
59365911
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228952.aspx">MSDN: Observable.Replay</a>
59375912
*/
59385913
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, int bufferSize, long time, TimeUnit unit) {
5939-
return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation());
5914+
return replay(selector, bufferSize, time, unit, Schedulers.computation());
59405915
}
59415916

59425917
/**
@@ -6036,7 +6011,7 @@ public final Subject<T, T> call() {
60366011
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229526.aspx">MSDN: Observable.Replay</a>
60376012
*/
60386013
public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, long time, TimeUnit unit) {
6039-
return replay(selector, time, unit, Schedulers.threadPoolForComputation());
6014+
return replay(selector, time, unit, Schedulers.computation());
60406015
}
60416016

60426017
/**
@@ -6139,7 +6114,7 @@ public final ConnectableObservable<T> replay(int bufferSize) {
61396114
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229874.aspx">MSDN: Observable.Replay</a>
61406115
*/
61416116
public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit) {
6142-
return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation());
6117+
return replay(bufferSize, time, unit, Schedulers.computation());
61436118
}
61446119

61456120
/**
@@ -6209,7 +6184,7 @@ public final ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler
62096184
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229232.aspx">MSDN: Observable.Replay</a>
62106185
*/
62116186
public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
6212-
return replay(time, unit, Schedulers.threadPoolForComputation());
6187+
return replay(time, unit, Schedulers.computation());
62136188
}
62146189

62156190
/**
@@ -6516,7 +6491,7 @@ public final Observable<T> skip(int num) {
65166491
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#skip">RxJava Wiki: skip()</a>
65176492
*/
65186493
public final Observable<T> skip(long time, TimeUnit unit) {
6519-
return skip(time, unit, Schedulers.threadPoolForComputation());
6494+
return skip(time, unit, Schedulers.computation());
65206495
}
65216496

65226497
/**
@@ -6578,7 +6553,7 @@ public final Observable<T> skipLast(int count) {
65786553
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211750.aspx">MSDN: Observable.SkipLast</a>
65796554
*/
65806555
public final Observable<T> skipLast(long time, TimeUnit unit) {
6581-
return skipLast(time, unit, Schedulers.threadPoolForComputation());
6556+
return skipLast(time, unit, Schedulers.computation());
65826557
}
65836558

65846559
/**
@@ -6929,8 +6904,8 @@ public final Observable<T> startWith(T[] values, Scheduler scheduler) {
69296904
}
69306905

69316906
// TODO should this be called `observe` instead of `subscribe`?
6932-
public final void subscribe(Observer<? super T> o, Func0<OperatorSubscription> sf) {
6933-
f.call(o, sf.call());
6907+
public final void subscribe(Operator<? super T> o) {
6908+
f.call(o);
69346909
}
69356910

69366911
/**
@@ -7166,7 +7141,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, Scheduler s
71667141
*/
71677142
public final Subscription subscribe(Observer<? super T> observer) {
71687143
// allow the hook to intercept and/or decorate
7169-
Action2<Observer<? super T>, OperatorSubscription> onSubscribeFunction = hook.onSubscribeStart(this, f);
7144+
Action1<Operator<? super T>> onSubscribeFunction = hook.onSubscribeStart(this, f);
71707145
// validate and proceed
71717146
if (observer == null) {
71727147
throw new IllegalArgumentException("observer can not be null");
@@ -7176,17 +7151,20 @@ public final Subscription subscribe(Observer<? super T> observer) {
71767151
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
71777152
}
71787153
try {
7179-
OperatorSubscription os = new OperatorSubscription();
7154+
Operator<? super T> op = null;
71807155
/**
71817156
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
71827157
*/
71837158
if (isInternalImplementation(observer)) {
7184-
onSubscribeFunction.call(observer, os);
7159+
op = Operator.create(observer, new CompositeSubscription());
7160+
onSubscribeFunction.call(op);
71857161
} else {
7186-
SafeObservableSubscription subscription = new SafeObservableSubscription(os);
7187-
onSubscribeFunction.call(new SafeObserver<T>(subscription, observer), os);
7162+
// TODO this doesn't seem correct any longer with the Operator and injecting of CompositeSubscription
7163+
SafeObservableSubscription subscription = new SafeObservableSubscription(op);
7164+
op = Operator.create(new SafeObserver<T>(subscription, observer), new CompositeSubscription());
7165+
onSubscribeFunction.call(op);
71887166
}
7189-
return hook.onSubscribeReturn(this, os);
7167+
return hook.onSubscribeReturn(this, op);
71907168
} catch (OnErrorNotImplementedException e) {
71917169
// special handling when onError is not implemented ... we just rethrow
71927170
throw e;
@@ -7426,7 +7404,7 @@ public final Observable<T> take(final int num) {
74267404
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#take">RxJava Wiki: take()</a>
74277405
*/
74287406
public final Observable<T> take(long time, TimeUnit unit) {
7429-
return take(time, unit, Schedulers.threadPoolForComputation());
7407+
return take(time, unit, Schedulers.computation());
74307408
}
74317409

74327410
/**
@@ -7517,7 +7495,7 @@ public final Observable<T> takeLast(final int count) {
75177495
* were emitted in a specified window of time before the Observable completed.
75187496
*/
75197497
public final Observable<T> takeLast(int count, long time, TimeUnit unit) {
7520-
return takeLast(count, time, unit, Schedulers.threadPoolForComputation());
7498+
return takeLast(count, time, unit, Schedulers.computation());
75217499
}
75227500

75237501
/**
@@ -7560,7 +7538,7 @@ public final Observable<T> takeLast(int count, long time, TimeUnit unit, Schedul
75607538
* the window of time before the Observable completed specified by {@code time}
75617539
*/
75627540
public final Observable<T> takeLast(long time, TimeUnit unit) {
7563-
return takeLast(time, unit, Schedulers.threadPoolForComputation());
7541+
return takeLast(time, unit, Schedulers.computation());
75647542
}
75657543

75667544
/**
@@ -8735,10 +8713,10 @@ public final <T2, R> Observable<R> zip(Observable<? extends T2> other, Func2<? s
87358713
*/
87368714
private static class NeverObservable<T> extends Observable<T> {
87378715
public NeverObservable() {
8738-
super(new Action2<Observer<? super T>, OperatorSubscription>() {
8716+
super(new Action1<Operator<? super T>>() {
87398717

87408718
@Override
8741-
public void call(Observer<? super T> observer, OperatorSubscription t2) {
8719+
public void call(Operator<? super T> observer) {
87428720
// do nothing
87438721
}
87448722

@@ -8756,7 +8734,7 @@ public void call(Observer<? super T> observer, OperatorSubscription t2) {
87568734
private static class ThrowObservable<T> extends Observable<T> {
87578735

87588736
public ThrowObservable(final Throwable exception) {
8759-
super(new Action2<Observer<? super T>, OperatorSubscription>() {
8737+
super(new Action1<Operator<? super T>>() {
87608738

87618739
/**
87628740
* Accepts an {@link Observer} and calls its {@link Observer#onError onError} method.
@@ -8766,14 +8744,15 @@ public ThrowObservable(final Throwable exception) {
87668744
* @return a reference to the subscription
87678745
*/
87688746
@Override
8769-
public void call(Observer<? super T> observer, OperatorSubscription t2) {
8747+
public void call(Operator<? super T> observer) {
87708748
observer.onError(exception);
87718749
}
87728750

87738751
});
87748752
}
87758753
}
87768754

8755+
@SuppressWarnings("rawtypes")
87778756
private final static ConcurrentHashMap<Class, Boolean> internalClassMap = new ConcurrentHashMap<Class, Boolean>();
87788757

87798758
/**

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
import rx.Observable;
1919
import rx.Observer;
20+
import rx.Operator;
2021
import rx.Subscription;
2122
import rx.operators.OperationRefCount;
22-
import rx.util.functions.Action2;
23+
import rx.util.functions.Action1;
2324

2425
/**
2526
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
@@ -38,7 +39,7 @@
3839

3940
public abstract class ConnectableObservable<T> extends Observable<T> {
4041

41-
protected ConnectableObservable(Action2<Observer<? super T>, OperatorSubscription> onSubscribe) {
42+
protected ConnectableObservable(Action1<Operator<? super T>> onSubscribe) {
4243
super(onSubscribe);
4344
}
4445

rxjava-core/src/main/java/rx/observables/GroupedObservable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
package rx.observables;
1717

1818
import rx.Observable;
19-
import rx.Observer;
20-
import rx.util.functions.Action2;
19+
import rx.Operator;
20+
import rx.util.functions.Action1;
2121
import rx.util.functions.Func1;
2222

2323
/**
@@ -33,7 +33,7 @@
3333
public class GroupedObservable<K, T> extends Observable<T> {
3434
private final K key;
3535

36-
public GroupedObservable(K key, Action2<Observer<? super T>, OperatorSubscription> onSubscribe) {
36+
public GroupedObservable(K key, Action1<Operator<? super T>> onSubscribe) {
3737
super(onSubscribe);
3838
this.key = key;
3939
}

0 commit comments

Comments
 (0)