Skip to content

Commit a40a413

Browse files
committed
Merge pull request #2983 from akarnokd/MultiOnStartFix
Fixed multiple calls to onStart.
2 parents 4350f84 + fd9b6bc commit a40a413

11 files changed

+145
-38
lines changed

src/main/java/rx/internal/operators/OnSubscribeDefer.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,28 @@ public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFacto
3838
}
3939

4040
@Override
41-
public void call(Subscriber<? super T> s) {
41+
public void call(final Subscriber<? super T> s) {
4242
Observable<? extends T> o;
4343
try {
4444
o = observableFactory.call();
4545
} catch (Throwable t) {
4646
s.onError(t);
4747
return;
4848
}
49-
o.unsafeSubscribe(s);
49+
o.unsafeSubscribe(new Subscriber<T>(s) {
50+
@Override
51+
public void onNext(T t) {
52+
s.onNext(t);
53+
}
54+
@Override
55+
public void onError(Throwable e) {
56+
s.onError(e);
57+
}
58+
@Override
59+
public void onCompleted() {
60+
s.onCompleted();
61+
}
62+
});
5063
}
5164

5265
}

src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.TimeUnit;
19-
import rx.Observable;
19+
20+
import rx.*;
2021
import rx.Observable.OnSubscribe;
21-
import rx.Scheduler;
2222
import rx.Scheduler.Worker;
23-
import rx.Subscriber;
2423
import rx.functions.Action0;
2524

2625
/**
@@ -50,7 +49,20 @@ public void call(final Subscriber<? super T> s) {
5049
@Override
5150
public void call() {
5251
if (!s.isUnsubscribed()) {
53-
source.unsafeSubscribe(s);
52+
source.unsafeSubscribe(new Subscriber<T>(s) {
53+
@Override
54+
public void onNext(T t) {
55+
s.onNext(t);
56+
}
57+
@Override
58+
public void onError(Throwable e) {
59+
s.onError(e);
60+
}
61+
@Override
62+
public void onCompleted() {
63+
s.onCompleted();
64+
}
65+
});
5466
}
5567
}
5668
}, time, unit);

src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable;
18+
import rx.*;
1919
import rx.Observable.OnSubscribe;
20-
import rx.Subscriber;
2120
import rx.functions.Func0;
2221

2322
/**
@@ -43,7 +42,20 @@ public void call(final Subscriber<? super T> child) {
4342
@Override
4443
public void onCompleted() {
4544
// subscribe to actual source
46-
source.unsafeSubscribe(child);
45+
source.unsafeSubscribe(new Subscriber<T>(child) {
46+
@Override
47+
public void onNext(T t) {
48+
child.onNext(t);
49+
}
50+
@Override
51+
public void onError(Throwable e) {
52+
child.onError(e);
53+
}
54+
@Override
55+
public void onCompleted() {
56+
child.onCompleted();
57+
}
58+
});
4759
}
4860

4961
@Override

src/main/java/rx/internal/operators/OnSubscribeUsing.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,10 @@
1818
import java.util.Arrays;
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020

21-
import rx.Observable;
21+
import rx.*;
2222
import rx.Observable.OnSubscribe;
23-
import rx.Subscriber;
24-
import rx.Subscription;
2523
import rx.exceptions.CompositeException;
26-
import rx.functions.Action0;
27-
import rx.functions.Action1;
28-
import rx.functions.Func0;
29-
import rx.functions.Func1;
24+
import rx.functions.*;
3025

3126
/**
3227
* Constructs an observable sequence that depends on a resource object.
@@ -48,7 +43,7 @@ public OnSubscribeUsing(Func0<Resource> resourceFactory,
4843
}
4944

5045
@Override
51-
public void call(Subscriber<? super T> subscriber) {
46+
public void call(final Subscriber<? super T> subscriber) {
5247

5348
try {
5449

@@ -73,7 +68,20 @@ public void call(Subscriber<? super T> subscriber) {
7368
observable = source;
7469
try {
7570
// start
76-
observable.unsafeSubscribe(subscriber);
71+
observable.unsafeSubscribe(new Subscriber<T>(subscriber) {
72+
@Override
73+
public void onNext(T t) {
74+
subscriber.onNext(t);
75+
}
76+
@Override
77+
public void onError(Throwable e) {
78+
subscriber.onError(e);
79+
}
80+
@Override
81+
public void onCompleted() {
82+
subscriber.onCompleted();
83+
}
84+
});
7785
} catch (Throwable e) {
7886
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
7987
if (disposeError != null)

src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,19 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
3939
subscribe.call();
4040
// Pass through since this operator is for notification only, there is
4141
// no change to the stream whatsoever.
42-
return child;
42+
return new Subscriber<T>(child) {
43+
@Override
44+
public void onNext(T t) {
45+
child.onNext(t);
46+
}
47+
@Override
48+
public void onError(Throwable e) {
49+
child.onError(e);
50+
}
51+
@Override
52+
public void onCompleted() {
53+
child.onCompleted();
54+
}
55+
};
4356
}
4457
}

src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import rx.Observable.Operator;
19-
import rx.Subscriber;
19+
import rx.*;
2020
import rx.functions.Action0;
2121
import rx.subscriptions.Subscriptions;
2222

@@ -41,6 +41,22 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4141

4242
// Pass through since this operator is for notification only, there is
4343
// no change to the stream whatsoever.
44-
return child;
44+
return new Subscriber<T>(child) {
45+
@Override
46+
public void onStart() {
47+
}
48+
@Override
49+
public void onNext(T t) {
50+
child.onNext(t);
51+
}
52+
@Override
53+
public void onError(Throwable e) {
54+
child.onError(e);
55+
}
56+
@Override
57+
public void onCompleted() {
58+
child.onCompleted();
59+
}
60+
};
4561
}
4662
}

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,9 @@ public void call() {
259259
}
260260

261261
}).unsafeSubscribe(new Subscriber<T>(o) {
262-
262+
@Override
263+
public void onStart() {
264+
}
263265
@Override
264266
public void onCompleted() {
265267
o.onCompleted();

src/main/java/rx/internal/operators/OperatorMulticast.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,21 @@ public void call() {
114114
guardedSubscription = gs.get();
115115

116116
// register any subscribers that are waiting with this new subject
117-
for(Subscriber<? super R> s : waitingForConnect) {
118-
subject.unsafeSubscribe(s);
117+
for(final Subscriber<? super R> s : waitingForConnect) {
118+
subject.unsafeSubscribe(new Subscriber<R>(s) {
119+
@Override
120+
public void onNext(R t) {
121+
s.onNext(t);
122+
}
123+
@Override
124+
public void onError(Throwable e) {
125+
s.onError(e);
126+
}
127+
@Override
128+
public void onCompleted() {
129+
s.onCompleted();
130+
}
131+
});
119132
}
120133
// clear the waiting list as any new ones that come in after leaving this synchronized block will go direct to the Subject
121134
waitingForConnect.clear();

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable;
19-
import rx.Producer;
18+
import rx.*;
2019
import rx.Observable.Operator;
21-
import rx.Subscriber;
2220
import rx.exceptions.Exceptions;
2321
import rx.functions.Func1;
22+
import rx.internal.producers.ProducerArbiter;
2423
import rx.plugins.RxJavaPlugins;
24+
import rx.subscriptions.SerialSubscription;
2525

2626
/**
2727
* Instruct an Observable to pass control to another Observable (the return value of a function)
@@ -51,6 +51,8 @@ public OperatorOnErrorResumeNextViaFunction(Func1<Throwable, ? extends Observabl
5151

5252
@Override
5353
public Subscriber<? super T> call(final Subscriber<? super T> child) {
54+
final ProducerArbiter pa = new ProducerArbiter();
55+
final SerialSubscription ssub = new SerialSubscription();
5456
Subscriber<T> parent = new Subscriber<T>() {
5557

5658
private boolean done = false;
@@ -74,8 +76,28 @@ public void onError(Throwable e) {
7476
try {
7577
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
7678
unsubscribe();
79+
Subscriber<T> next = new Subscriber<T>() {
80+
@Override
81+
public void onNext(T t) {
82+
child.onNext(t);
83+
}
84+
@Override
85+
public void onError(Throwable e) {
86+
child.onError(e);
87+
}
88+
@Override
89+
public void onCompleted() {
90+
child.onCompleted();
91+
}
92+
@Override
93+
public void setProducer(Producer producer) {
94+
pa.setProducer(producer);
95+
}
96+
};
97+
ssub.set(next);
98+
7799
Observable<? extends T> resume = resumeFunction.call(e);
78-
resume.unsafeSubscribe(child);
100+
resume.unsafeSubscribe(next);
79101
} catch (Throwable e2) {
80102
child.onError(e2);
81103
}
@@ -91,16 +113,13 @@ public void onNext(T t) {
91113

92114
@Override
93115
public void setProducer(final Producer producer) {
94-
child.setProducer(new Producer() {
95-
@Override
96-
public void request(long n) {
97-
producer.request(n);
98-
}
99-
});
116+
pa.setProducer(producer);
100117
}
101118

102119
};
103-
child.add(parent);
120+
child.add(ssub);
121+
ssub.set(parent);
122+
child.setProducer(pa);
104123
return parent;
105124
}
106125

src/main/java/rx/observers/TestSubscriber.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,6 @@ public TestSubscriber() {
9999
public void onStart() {
100100
if (initialRequest >= 0) {
101101
requestMore(initialRequest);
102-
} else {
103-
super.onStart();
104102
}
105103
}
106104

src/test/java/rx/internal/operators/OperatorPublishTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ public void call() {
226226
public void call() {
227227
child1Unsubscribed.set(true);
228228
}
229-
}).take(5).subscribe(ts1);
229+
}).take(5)
230+
.subscribe(ts1);
230231

231232
ts1.awaitTerminalEvent();
232233
ts2.awaitTerminalEvent();

0 commit comments

Comments
 (0)