From e41b215c64637658defaf8a625124bb5332574b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 14 Oct 2015 20:55:09 +0200 Subject: [PATCH] 1.x: operator DelaySubscription with plain Observable --- src/main/java/rx/Observable.java | 26 ++ .../OnSubscribeDelaySubscriptionOther.java | 79 ++++++ ...OnSubscribeDelaySubscriptionOtherTest.java | 246 ++++++++++++++++++ 3 files changed, 351 insertions(+) create mode 100644 src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java create mode 100644 src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 0a2ebab8ce..400f07416f 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -4195,6 +4195,32 @@ public final Observable delaySubscription(Func0> return create(new OnSubscribeDelaySubscriptionWithSelector(this, subscriptionDelay)); } + /** + * Returns an Observable that delays the subscription to this Observable + * until the other Observable emits an element or completes normally. + *

+ *

+ *
Backpressure:
+ *
The operator forwards the backpressure requests to this Observable once + * the subscription happens and requests Long.MAX_VALUE from the other Observable
+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the value type of the other Observable, irrelevant + * @param other the other Observable that should trigger the subscription + * to this Observable. + * @return an Observable that delays the subscription to this Observable + * until the other Observable emits an element or completes normally. + */ + @Experimental + public final Observable delaySubscription(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return create(new OnSubscribeDelaySubscriptionOther(this, other)); + } + /** * Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the * {@link Notification} objects emitted by the source Observable into the items or notifications they diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java new file mode 100644 index 0000000000..2a8b7e1601 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java @@ -0,0 +1,79 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package rx.internal.operators; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.observers.Subscribers; +import rx.plugins.*; +import rx.subscriptions.SerialSubscription; + +/** + * Delays the subscription to the main source until the other + * observable fires an event or completes. + * @param the main type + * @param the other value type, ignored + */ +public final class OnSubscribeDelaySubscriptionOther implements OnSubscribe { + final Observable main; + final Observable other; + + public OnSubscribeDelaySubscriptionOther(Observable main, Observable other) { + this.main = main; + this.other = other; + } + + @Override + public void call(Subscriber t) { + final Subscriber child = Subscribers.wrap(t); + + final SerialSubscription serial = new SerialSubscription(); + + Subscriber otherSubscriber = new Subscriber() { + boolean done; + @Override + public void onNext(U t) { + onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + return; + } + done = true; + child.onError(e); + } + + @Override + public void onCompleted() { + if (done) { + return; + } + done = true; + serial.set(child); + + main.unsafeSubscribe(child); + } + }; + + serial.set(otherSubscriber); + + other.unsafeSubscribe(otherSubscriber); + } +} diff --git a/src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java b/src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java new file mode 100644 index 0000000000..e157a788e5 --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java @@ -0,0 +1,246 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package rx.internal.operators; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.*; + +import rx.Observable; +import rx.exceptions.TestException; +import rx.functions.Action0; +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; + +public class OnSubscribeDelaySubscriptionOtherTest { + @Test + public void testNoPrematureSubscription() { + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger subscribed = new AtomicInteger(); + + Observable.just(1) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onNext(1); + + Assert.assertEquals("No subscription", 1, subscribed.get()); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void testNoMultipleSubscriptions() { + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger subscribed = new AtomicInteger(); + + Observable.just(1) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onNext(1); + other.onNext(2); + + Assert.assertEquals("No subscription", 1, subscribed.get()); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void testCompleteTriggersSubscription() { + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger subscribed = new AtomicInteger(); + + Observable.just(1) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onCompleted(); + + Assert.assertEquals("No subscription", 1, subscribed.get()); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void testNoPrematureSubscriptionToError() { + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger subscribed = new AtomicInteger(); + + Observable.error(new TestException()) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onCompleted(); + + Assert.assertEquals("No subscription", 1, subscribed.get()); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } + + @Test + public void testNoSubscriptionIfOtherErrors() { + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger subscribed = new AtomicInteger(); + + Observable.error(new TestException()) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onError(new TestException()); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } + + @Test + public void testBackpressurePassesThrough() { + + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(0L); + + final AtomicInteger subscribed = new AtomicInteger(); + + Observable.just(1, 2, 3, 4, 5) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onNext(1); + + Assert.assertEquals("No subscription", 1, subscribed.get()); + + Assert.assertFalse("Not unsubscribed from other", other.hasObservers()); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + ts.requestMore(1); + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(2); + ts.assertValues(1, 2, 3); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(10); + ts.assertValues(1, 2, 3, 4, 5); + ts.assertNoErrors(); + ts.assertCompleted(); + } +}