From fd9b6bcbc332c9bbf8354e6d3318209c0fc19957 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 26 May 2015 09:15:33 +0200 Subject: [PATCH] Fixed multiple calls to onStart. --- .../internal/operators/OnSubscribeDefer.java | 17 +++++++- .../OnSubscribeDelaySubscription.java | 20 +++++++-- ...ubscribeDelaySubscriptionWithSelector.java | 18 ++++++-- .../internal/operators/OnSubscribeUsing.java | 26 ++++++++---- .../operators/OperatorDoOnSubscribe.java | 15 ++++++- .../operators/OperatorDoOnUnsubscribe.java | 20 ++++++++- .../internal/operators/OperatorGroupBy.java | 4 +- .../internal/operators/OperatorMulticast.java | 17 +++++++- .../OperatorOnErrorResumeNextViaFunction.java | 41 ++++++++++++++----- .../java/rx/observers/TestSubscriber.java | 2 - .../operators/OperatorPublishTest.java | 3 +- 11 files changed, 145 insertions(+), 38 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeDefer.java b/src/main/java/rx/internal/operators/OnSubscribeDefer.java index b0dd8ba0f0..4a6434140c 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeDefer.java +++ b/src/main/java/rx/internal/operators/OnSubscribeDefer.java @@ -38,7 +38,7 @@ public OnSubscribeDefer(Func0> observableFacto } @Override - public void call(Subscriber s) { + public void call(final Subscriber s) { Observable o; try { o = observableFactory.call(); @@ -46,7 +46,20 @@ public void call(Subscriber s) { s.onError(t); return; } - o.unsafeSubscribe(s); + o.unsafeSubscribe(new Subscriber(s) { + @Override + public void onNext(T t) { + s.onNext(t); + } + @Override + public void onError(Throwable e) { + s.onError(e); + } + @Override + public void onCompleted() { + s.onCompleted(); + } + }); } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java index d60bdb73a6..95036d399e 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java +++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java @@ -16,11 +16,10 @@ package rx.internal.operators; import java.util.concurrent.TimeUnit; -import rx.Observable; + +import rx.*; import rx.Observable.OnSubscribe; -import rx.Scheduler; import rx.Scheduler.Worker; -import rx.Subscriber; import rx.functions.Action0; /** @@ -50,7 +49,20 @@ public void call(final Subscriber s) { @Override public void call() { if (!s.isUnsubscribed()) { - source.unsafeSubscribe(s); + source.unsafeSubscribe(new Subscriber(s) { + @Override + public void onNext(T t) { + s.onNext(t); + } + @Override + public void onError(Throwable e) { + s.onError(e); + } + @Override + public void onCompleted() { + s.onCompleted(); + } + }); } } }, time, unit); diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java index 64e0997474..d6b2f0ad2c 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java +++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java @@ -15,9 +15,8 @@ */ package rx.internal.operators; -import rx.Observable; +import rx.*; import rx.Observable.OnSubscribe; -import rx.Subscriber; import rx.functions.Func0; /** @@ -43,7 +42,20 @@ public void call(final Subscriber child) { @Override public void onCompleted() { // subscribe to actual source - source.unsafeSubscribe(child); + source.unsafeSubscribe(new Subscriber(child) { + @Override + public void onNext(T t) { + child.onNext(t); + } + @Override + public void onError(Throwable e) { + child.onError(e); + } + @Override + public void onCompleted() { + child.onCompleted(); + } + }); } @Override diff --git a/src/main/java/rx/internal/operators/OnSubscribeUsing.java b/src/main/java/rx/internal/operators/OnSubscribeUsing.java index 8c29d632d9..7470a65dc8 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeUsing.java +++ b/src/main/java/rx/internal/operators/OnSubscribeUsing.java @@ -18,15 +18,10 @@ import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; -import rx.Observable; +import rx.*; import rx.Observable.OnSubscribe; -import rx.Subscriber; -import rx.Subscription; import rx.exceptions.CompositeException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; +import rx.functions.*; /** * Constructs an observable sequence that depends on a resource object. @@ -48,7 +43,7 @@ public OnSubscribeUsing(Func0 resourceFactory, } @Override - public void call(Subscriber subscriber) { + public void call(final Subscriber subscriber) { try { @@ -73,7 +68,20 @@ public void call(Subscriber subscriber) { observable = source; try { // start - observable.unsafeSubscribe(subscriber); + observable.unsafeSubscribe(new Subscriber(subscriber) { + @Override + public void onNext(T t) { + subscriber.onNext(t); + } + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + }); } catch (Throwable e) { Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly); if (disposeError != null) diff --git a/src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java b/src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java index 391e937d1e..b7999c2b5c 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java @@ -39,6 +39,19 @@ public Subscriber call(final Subscriber child) { subscribe.call(); // Pass through since this operator is for notification only, there is // no change to the stream whatsoever. - return child; + return new Subscriber(child) { + @Override + public void onNext(T t) { + child.onNext(t); + } + @Override + public void onError(Throwable e) { + child.onError(e); + } + @Override + public void onCompleted() { + child.onCompleted(); + } + }; } } diff --git a/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java b/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java index 480b31f38c..396012c2eb 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java @@ -16,7 +16,7 @@ package rx.internal.operators; import rx.Observable.Operator; -import rx.Subscriber; +import rx.*; import rx.functions.Action0; import rx.subscriptions.Subscriptions; @@ -41,6 +41,22 @@ public Subscriber call(final Subscriber child) { // Pass through since this operator is for notification only, there is // no change to the stream whatsoever. - return child; + return new Subscriber(child) { + @Override + public void onStart() { + } + @Override + public void onNext(T t) { + child.onNext(t); + } + @Override + public void onError(Throwable e) { + child.onError(e); + } + @Override + public void onCompleted() { + child.onCompleted(); + } + }; } } diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index 93631569df..3d8f45067c 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -259,7 +259,9 @@ public void call() { } }).unsafeSubscribe(new Subscriber(o) { - + @Override + public void onStart() { + } @Override public void onCompleted() { o.onCompleted(); diff --git a/src/main/java/rx/internal/operators/OperatorMulticast.java b/src/main/java/rx/internal/operators/OperatorMulticast.java index 4d5d10f4f3..8de1b93984 100644 --- a/src/main/java/rx/internal/operators/OperatorMulticast.java +++ b/src/main/java/rx/internal/operators/OperatorMulticast.java @@ -128,8 +128,21 @@ public void call() { guardedSubscription = gs.get(); // register any subscribers that are waiting with this new subject - for(Subscriber s : waitingForConnect) { - subject.unsafeSubscribe(s); + for(final Subscriber s : waitingForConnect) { + subject.unsafeSubscribe(new Subscriber(s) { + @Override + public void onNext(R t) { + s.onNext(t); + } + @Override + public void onError(Throwable e) { + s.onError(e); + } + @Override + public void onCompleted() { + s.onCompleted(); + } + }); } // clear the waiting list as any new ones that come in after leaving this synchronized block will go direct to the Subject waitingForConnect.clear(); diff --git a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java index 96a3c5c170..70380a1a2b 100644 --- a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java +++ b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java @@ -15,13 +15,13 @@ */ package rx.internal.operators; -import rx.Observable; -import rx.Producer; +import rx.*; import rx.Observable.Operator; -import rx.Subscriber; import rx.exceptions.Exceptions; import rx.functions.Func1; +import rx.internal.producers.ProducerArbiter; import rx.plugins.RxJavaPlugins; +import rx.subscriptions.SerialSubscription; /** * Instruct an Observable to pass control to another Observable (the return value of a function) @@ -51,6 +51,8 @@ public OperatorOnErrorResumeNextViaFunction(Func1 call(final Subscriber child) { + final ProducerArbiter pa = new ProducerArbiter(); + final SerialSubscription ssub = new SerialSubscription(); Subscriber parent = new Subscriber() { private boolean done = false; @@ -74,8 +76,28 @@ public void onError(Throwable e) { try { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); unsubscribe(); + Subscriber next = new Subscriber() { + @Override + public void onNext(T t) { + child.onNext(t); + } + @Override + public void onError(Throwable e) { + child.onError(e); + } + @Override + public void onCompleted() { + child.onCompleted(); + } + @Override + public void setProducer(Producer producer) { + pa.setProducer(producer); + } + }; + ssub.set(next); + Observable resume = resumeFunction.call(e); - resume.unsafeSubscribe(child); + resume.unsafeSubscribe(next); } catch (Throwable e2) { child.onError(e2); } @@ -91,16 +113,13 @@ public void onNext(T t) { @Override public void setProducer(final Producer producer) { - child.setProducer(new Producer() { - @Override - public void request(long n) { - producer.request(n); - } - }); + pa.setProducer(producer); } }; - child.add(parent); + child.add(ssub); + ssub.set(parent); + child.setProducer(pa); return parent; } diff --git a/src/main/java/rx/observers/TestSubscriber.java b/src/main/java/rx/observers/TestSubscriber.java index 56d12b46e2..027221b805 100644 --- a/src/main/java/rx/observers/TestSubscriber.java +++ b/src/main/java/rx/observers/TestSubscriber.java @@ -99,8 +99,6 @@ public TestSubscriber() { public void onStart() { if (initialRequest >= 0) { requestMore(initialRequest); - } else { - super.onStart(); } } diff --git a/src/test/java/rx/internal/operators/OperatorPublishTest.java b/src/test/java/rx/internal/operators/OperatorPublishTest.java index f6bfaa7e21..ab815ceb7d 100644 --- a/src/test/java/rx/internal/operators/OperatorPublishTest.java +++ b/src/test/java/rx/internal/operators/OperatorPublishTest.java @@ -226,7 +226,8 @@ public void call() { public void call() { child1Unsubscribed.set(true); } - }).take(5).subscribe(ts1); + }).take(5) + .subscribe(ts1); ts1.awaitTerminalEvent(); ts2.awaitTerminalEvent();