Skip to content

Commit 4a6441c

Browse files
akarnokdzsxwing
authored andcommitted
1.x: fix Single.flatMap not composing subscription through (#3941)
Closes #3940.
1 parent d43c05c commit 4a6441c

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

src/main/java/rx/Single.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ public static <T> Single<T> merge(final Single<? extends Single<? extends T>> so
671671

672672
@Override
673673
public void call(final SingleSubscriber<? super T> child) {
674-
source.subscribe(new SingleSubscriber<Single<? extends T>>() {
674+
SingleSubscriber<Single<? extends T>> parent = new SingleSubscriber<Single<? extends T>>() {
675675

676676
@Override
677677
public void onSuccess(Single<? extends T> innerSingle) {
@@ -683,7 +683,9 @@ public void onError(Throwable error) {
683683
child.onError(error);
684684
}
685685

686-
});
686+
};
687+
child.add(parent);
688+
source.subscribe(parent);
687689
}
688690
});
689691
}

src/test/java/rx/SingleTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1914,4 +1914,44 @@ public void subscribeWithNullObserver() {
19141914
assertEquals("observer is null", ex.getMessage());
19151915
}
19161916
}
1917+
1918+
@Test
1919+
public void unsubscribeComposesThrough() {
1920+
PublishSubject<Integer> ps = PublishSubject.create();
1921+
1922+
Subscription s = ps.toSingle()
1923+
.flatMap(new Func1<Integer, Single<Integer>>() {
1924+
@Override
1925+
public Single<Integer> call(Integer v) {
1926+
return Single.just(1);
1927+
}
1928+
})
1929+
.subscribe();
1930+
1931+
s.unsubscribe();
1932+
1933+
assertFalse("Observers present?!", ps.hasObservers());
1934+
}
1935+
1936+
@Test(timeout = 1000)
1937+
public void unsubscribeComposesThroughAsync() {
1938+
PublishSubject<Integer> ps = PublishSubject.create();
1939+
1940+
Subscription s = ps.toSingle()
1941+
.subscribeOn(Schedulers.io())
1942+
.flatMap(new Func1<Integer, Single<Integer>>() {
1943+
@Override
1944+
public Single<Integer> call(Integer v) {
1945+
return Single.just(1);
1946+
}
1947+
})
1948+
.subscribe();
1949+
1950+
while (!ps.hasObservers() && !Thread.currentThread().isInterrupted()) ;
1951+
1952+
s.unsubscribe();
1953+
1954+
assertFalse("Observers present?!", ps.hasObservers());
1955+
}
1956+
19171957
}

0 commit comments

Comments
 (0)