diff --git a/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java b/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java index a89b419025..3b3e295dd3 100644 --- a/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java +++ b/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java @@ -69,7 +69,7 @@ public void onCompleted() { }; - Subscriber result = new Subscriber(child) { + Subscriber result = new Subscriber() { @Override public void onNext(T t) { value.set(t); @@ -88,6 +88,8 @@ public void onCompleted() { } }; + child.add(result); + sampler.unsafeSubscribe(samplerSub); return result; diff --git a/src/test/java/rx/internal/operators/OperatorSampleTest.java b/src/test/java/rx/internal/operators/OperatorSampleTest.java index 2ef1ae8fb3..1db795cbfb 100644 --- a/src/test/java/rx/internal/operators/OperatorSampleTest.java +++ b/src/test/java/rx/internal/operators/OperatorSampleTest.java @@ -16,21 +16,16 @@ package rx.internal.operators; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.mockito.InOrder; import rx.*; import rx.Observable.OnSubscribe; -import rx.functions.Action0; +import rx.functions.*; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; @@ -283,4 +278,38 @@ public void call(Subscriber subscriber) { o.throttleLast(1, TimeUnit.MILLISECONDS).subscribe().unsubscribe(); verify(s).unsubscribe(); } + + @Test + public void testSampleOtherUnboundedIn() { + + final long[] requested = { -1 }; + + PublishSubject.create() + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + requested[0] = t; + } + }) + .sample(PublishSubject.create()).subscribe(); + + Assert.assertEquals(Long.MAX_VALUE, requested[0]); + } + + @Test + public void testSampleTimedUnboundedIn() { + + final long[] requested = { -1 }; + + PublishSubject.create() + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + requested[0] = t; + } + }) + .sample(1, TimeUnit.SECONDS).subscribe().unsubscribe(); + + Assert.assertEquals(Long.MAX_VALUE, requested[0]); + } }