diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java index 2a8b7e1601..dc3146d7e6 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java +++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java @@ -20,7 +20,7 @@ import rx.Observable.OnSubscribe; import rx.observers.Subscribers; import rx.plugins.*; -import rx.subscriptions.SerialSubscription; +import rx.subscriptions.*; /** * Delays the subscription to the main source until the other @@ -39,9 +39,12 @@ public OnSubscribeDelaySubscriptionOther(Observable main, Observabl @Override public void call(Subscriber t) { + final SerialSubscription serial = new SerialSubscription(); + + t.add(serial); + final Subscriber child = Subscribers.wrap(t); - final SerialSubscription serial = new SerialSubscription(); Subscriber otherSubscriber = new Subscriber() { boolean done; @@ -66,7 +69,7 @@ public void onCompleted() { return; } done = true; - serial.set(child); + serial.set(Subscriptions.unsubscribed()); main.unsafeSubscribe(child); } diff --git a/src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java b/src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java index e157a788e5..b44b720b41 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeDelaySubscriptionOtherTest.java @@ -16,7 +16,7 @@ package rx.internal.operators; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.*; @@ -243,4 +243,69 @@ public void call() { ts.assertNoErrors(); ts.assertCompleted(); } + + @Test + public void unsubscriptionPropagatesBeforeSubscribe() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.delaySubscription(other).subscribe(ts); + + Assert.assertFalse("source subscribed?", source.hasObservers()); + Assert.assertTrue("other not subscribed?", other.hasObservers()); + + ts.unsubscribe(); + + Assert.assertFalse("source subscribed?", source.hasObservers()); + Assert.assertFalse("other still subscribed?", other.hasObservers()); + } + + @Test + public void unsubscriptionPropagatesAfterSubscribe() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.delaySubscription(other).subscribe(ts); + + Assert.assertFalse("source subscribed?", source.hasObservers()); + Assert.assertTrue("other not subscribed?", other.hasObservers()); + + other.onCompleted(); + + Assert.assertTrue("source not subscribed?", source.hasObservers()); + Assert.assertFalse("other still subscribed?", other.hasObservers()); + + ts.unsubscribe(); + + Assert.assertFalse("source subscribed?", source.hasObservers()); + Assert.assertFalse("other still subscribed?", other.hasObservers()); + } + + @Test + public void delayAndTakeUntilNeverSubscribeToSource() { + PublishSubject delayUntil = PublishSubject.create(); + PublishSubject interrupt = PublishSubject.create(); + final AtomicBoolean subscribed = new AtomicBoolean(false); + + Observable.just(1) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.set(true); + } + }) + .delaySubscription(delayUntil) + .takeUntil(interrupt) + .subscribe(); + + interrupt.onNext(9000); + delayUntil.onNext(1); + + Assert.assertFalse(subscribed.get()); + } + } diff --git a/src/test/java/rx/internal/operators/OperatorDelayTest.java b/src/test/java/rx/internal/operators/OperatorDelayTest.java index e4db021eaf..315248a5db 100644 --- a/src/test/java/rx/internal/operators/OperatorDelayTest.java +++ b/src/test/java/rx/internal/operators/OperatorDelayTest.java @@ -30,9 +30,9 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.mockito.InOrder; import org.mockito.Mock; @@ -41,9 +41,7 @@ import rx.Observer; import rx.Subscription; import rx.exceptions.TestException; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; +import rx.functions.*; import rx.internal.util.RxRingBuffer; import rx.observers.TestObserver; import rx.observers.TestSubscriber; @@ -821,4 +819,47 @@ public void testErrorRunsBeforeOnNext() { ts.assertError(TestException.class); ts.assertNotCompleted(); } + + @Test + public void delaySubscriptionCancelBeforeTime() { + PublishSubject source = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.delaySubscription(100, TimeUnit.MILLISECONDS, scheduler).subscribe(ts); + + Assert.assertFalse("source subscribed?", source.hasObservers()); + + ts.unsubscribe(); + + Assert.assertFalse("source subscribed?", source.hasObservers()); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + Assert.assertFalse("source subscribed?", source.hasObservers()); + } + + @Test + public void delayAndTakeUntilNeverSubscribeToSource() { + PublishSubject interrupt = PublishSubject.create(); + final AtomicBoolean subscribed = new AtomicBoolean(false); + TestScheduler testScheduler = new TestScheduler(); + + Observable.just(1) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.set(true); + } + }) + .delaySubscription(1, TimeUnit.SECONDS, testScheduler) + .takeUntil(interrupt) + .subscribe(); + + interrupt.onNext(9000); + testScheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertFalse(subscribed.get()); + } + }