Skip to content

Commit 2df0bea

Browse files
Merge pull request ReactiveX#206 from thegeez/list-subscribe
Observable.toList breaks with multiple subscribers
2 parents bf5056f + c21c1e7 commit 2df0bea

File tree

1 file changed

+24
-1
lines changed

1 file changed

+24
-1
lines changed

rxjava-core/src/main/java/rx/operators/OperationToObservableList.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public static <T> Func1<Observer<List<T>>, Subscription> toObservableList(Observ
4040
private static class ToObservableList<T> implements Func1<Observer<List<T>>, Subscription> {
4141

4242
private final Observable<T> that;
43-
final ConcurrentLinkedQueue<T> list = new ConcurrentLinkedQueue<T>();
4443

4544
public ToObservableList(Observable<T> that) {
4645
this.that = that;
@@ -49,6 +48,7 @@ public ToObservableList(Observable<T> that) {
4948
public Subscription call(final Observer<List<T>> observer) {
5049

5150
return that.subscribe(new Observer<T>() {
51+
final ConcurrentLinkedQueue<T> list = new ConcurrentLinkedQueue<T>();
5252
public void onNext(T value) {
5353
// onNext can be concurrently executed so list must be thread-safe
5454
list.add(value);
@@ -94,5 +94,28 @@ public void testList() {
9494
verify(aObserver, Mockito.never()).onError(any(Exception.class));
9595
verify(aObserver, times(1)).onCompleted();
9696
}
97+
98+
@Test
99+
public void testListMultipleObservers() {
100+
Observable<String> w = Observable.toObservable("one", "two", "three");
101+
Observable<List<String>> observable = Observable.create(toObservableList(w));
102+
103+
@SuppressWarnings("unchecked")
104+
Observer<List<String>> o1 = mock(Observer.class);
105+
observable.subscribe(o1);
106+
107+
Observer<List<String>> o2 = mock(Observer.class);
108+
observable.subscribe(o2);
109+
110+
List<String> expected = Arrays.asList("one", "two", "three");
111+
112+
verify(o1, times(1)).onNext(expected);
113+
verify(o1, Mockito.never()).onError(any(Exception.class));
114+
verify(o1, times(1)).onCompleted();
115+
116+
verify(o2, times(1)).onNext(expected);
117+
verify(o2, Mockito.never()).onError(any(Exception.class));
118+
verify(o2, times(1)).onCompleted();
119+
}
97120
}
98121
}

0 commit comments

Comments
 (0)