diff --git a/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java b/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java index 45614dfc28..f8065a610b 100644 --- a/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java +++ b/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java @@ -66,7 +66,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { - // onNext(null); // emit the very last value? + onNext(null); s.onCompleted(); // no need to null check, main is assigned before any of the two gets subscribed main.get().unsubscribe(); @@ -88,7 +88,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { - // samplerSub.onNext(null); // emit the very last value? + samplerSub.onNext(null); s.onCompleted(); samplerSub.unsubscribe(); diff --git a/src/main/java/rx/internal/operators/OperatorSampleWithTime.java b/src/main/java/rx/internal/operators/OperatorSampleWithTime.java index 0fdcbd2c68..39e783062c 100644 --- a/src/main/java/rx/internal/operators/OperatorSampleWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorSampleWithTime.java @@ -89,12 +89,17 @@ public void onError(Throwable e) { @Override public void onCompleted() { + emitIfNonEmpty(); subscriber.onCompleted(); unsubscribe(); } @Override public void call() { + emitIfNonEmpty(); + } + + private void emitIfNonEmpty() { Object localValue = value.getAndSet(EMPTY_TOKEN); if (localValue != EMPTY_TOKEN) { try { diff --git a/src/test/java/rx/internal/operators/OperatorSampleTest.java b/src/test/java/rx/internal/operators/OperatorSampleTest.java index 78d3633d6f..c05abda5d2 100644 --- a/src/test/java/rx/internal/operators/OperatorSampleTest.java +++ b/src/test/java/rx/internal/operators/OperatorSampleTest.java @@ -109,6 +109,39 @@ public void call() { verify(observer, never()).onError(any(Throwable.class)); } + @Test + public void sampleWithTimeEmitAndTerminate() { + Observable source = Observable.create(new OnSubscribe() { + @Override + public void call(final Subscriber observer1) { + innerScheduler.schedule(new Action0() { + @Override + public void call() { + observer1.onNext(1L); + } + }, 1, TimeUnit.SECONDS); + innerScheduler.schedule(new Action0() { + @Override + public void call() { + observer1.onNext(2L); + observer1.onCompleted(); + } + }, 2, TimeUnit.SECONDS); + } + }); + + Observable sampled = source.sample(400L, TimeUnit.MILLISECONDS, scheduler); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder.verify(observer, times(1)).onNext(2L); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test public void sampleWithSamplerNormal() { PublishSubject source = PublishSubject.create(); @@ -208,7 +241,7 @@ public void sampleWithSamplerEmitAndTerminate() { InOrder inOrder = inOrder(observer2); inOrder.verify(observer2, never()).onNext(1); inOrder.verify(observer2, times(1)).onNext(2); - inOrder.verify(observer2, never()).onNext(3); + inOrder.verify(observer2, times(1)).onNext(3); inOrder.verify(observer2, times(1)).onCompleted(); inOrder.verify(observer2, never()).onNext(any()); verify(observer, never()).onError(any(Throwable.class));