diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 7ba3325434..c7ce03b27a 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -671,7 +671,7 @@ public static Single merge(final Single> so @Override public void call(final SingleSubscriber child) { - source.subscribe(new SingleSubscriber>() { + SingleSubscriber> parent = new SingleSubscriber>() { @Override public void onSuccess(Single innerSingle) { @@ -683,7 +683,9 @@ public void onError(Throwable error) { child.onError(error); } - }); + }; + child.add(parent); + source.subscribe(parent); } }); } diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 3760330eb4..2520667ccf 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -1914,4 +1914,44 @@ public void subscribeWithNullObserver() { assertEquals("observer is null", ex.getMessage()); } } + + @Test + public void unsubscribeComposesThrough() { + PublishSubject ps = PublishSubject.create(); + + Subscription s = ps.toSingle() + .flatMap(new Func1>() { + @Override + public Single call(Integer v) { + return Single.just(1); + } + }) + .subscribe(); + + s.unsubscribe(); + + assertFalse("Observers present?!", ps.hasObservers()); + } + + @Test(timeout = 1000) + public void unsubscribeComposesThroughAsync() { + PublishSubject ps = PublishSubject.create(); + + Subscription s = ps.toSingle() + .subscribeOn(Schedulers.io()) + .flatMap(new Func1>() { + @Override + public Single call(Integer v) { + return Single.just(1); + } + }) + .subscribe(); + + while (!ps.hasObservers() && !Thread.currentThread().isInterrupted()) ; + + s.unsubscribe(); + + assertFalse("Observers present?!", ps.hasObservers()); + } + }