Skip to content

Commit 89dbc93

Browse files
Merge pull request ReactiveX#197 from thegeez/take-while
TakeWhile observables do not properly complete
2 parents f4968d6 + 414dbc4 commit 89dbc93

File tree

1 file changed

+34
-2
lines changed

1 file changed

+34
-2
lines changed

rxjava-core/src/main/java/rx/operators/OperationTake.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import rx.util.AtomicObservableSubscription;
3030
import rx.util.functions.Func1;
3131
import rx.util.functions.Func2;
32-
32+
import rx.subjects.Subject;
3333
/**
3434
* Returns a specified number of contiguous values from the start of an observable sequence.
3535
*/
@@ -147,6 +147,7 @@ public void onNext(T args) {
147147
if (predicate.call(args, counter.getAndIncrement())) {
148148
observer.onNext(args);
149149
} else {
150+
observer.onCompleted();
150151
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
151152
subscription.unsubscribe();
152153
}
@@ -178,6 +179,37 @@ public Boolean call(Integer input) {
178179
verify(aObserver, times(1)).onCompleted();
179180
}
180181

182+
@Test
183+
public void testTakeWhileOnSubject1() {
184+
Subject<Integer> s = Subject.create();
185+
Observable<Integer> w = (Observable<Integer>)s;
186+
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
187+
@Override
188+
public Boolean call(Integer input) {
189+
return input < 3;
190+
}
191+
}));
192+
193+
@SuppressWarnings("unchecked")
194+
Observer<Integer> aObserver = mock(Observer.class);
195+
take.subscribe(aObserver);
196+
197+
s.onNext(1);
198+
s.onNext(2);
199+
s.onNext(3);
200+
s.onNext(4);
201+
s.onNext(5);
202+
s.onCompleted();
203+
204+
verify(aObserver, times(1)).onNext(1);
205+
verify(aObserver, times(1)).onNext(2);
206+
verify(aObserver, never()).onNext(3);
207+
verify(aObserver, never()).onNext(4);
208+
verify(aObserver, never()).onNext(5);
209+
verify(aObserver, never()).onError(any(Exception.class));
210+
verify(aObserver, times(1)).onCompleted();
211+
}
212+
181213
@Test
182214
public void testTakeWhile2() {
183215
Observable<String> w = Observable.toObservable("one", "two", "three");
@@ -293,4 +325,4 @@ public void run() {
293325
}
294326
}
295327

296-
}
328+
}

0 commit comments

Comments
 (0)