diff --git a/src/main/java/rx/subjects/TestSubject.java b/src/main/java/rx/subjects/TestSubject.java index 2400e929f1..2de860c602 100644 --- a/src/main/java/rx/subjects/TestSubject.java +++ b/src/main/java/rx/subjects/TestSubject.java @@ -15,8 +15,6 @@ */ package rx.subjects; -import java.util.concurrent.TimeUnit; - import rx.Observer; import rx.Scheduler; import rx.functions.Action0; @@ -25,6 +23,8 @@ import rx.schedulers.TestScheduler; import rx.subjects.SubjectSubscriptionManager.SubjectObserver; +import java.util.concurrent.TimeUnit; + /** * A variety of Subject that is useful for testing purposes. It operates on a {@link TestScheduler} and allows * you to precisely time emissions and notifications to the Subject's subscribers using relative virtual time @@ -68,11 +68,11 @@ protected TestSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager } /** - * Schedule a call to {@code onCompleted} at relative time of "now()" on TestScheduler. + * Schedule a call to {@code onCompleted} on TestScheduler. */ @Override public void onCompleted() { - onCompleted(innerScheduler.now()); + onCompleted(0); } private void _onCompleted() { @@ -86,10 +86,10 @@ private void _onCompleted() { /** * Schedule a call to {@code onCompleted} relative to "now()" +n milliseconds in the future. * - * @param timeInMilliseconds + * @param delayTime * the number of milliseconds in the future relative to "now()" at which to call {@code onCompleted} */ - public void onCompleted(long timeInMilliseconds) { + public void onCompleted(long delayTime) { innerScheduler.schedule(new Action0() { @Override @@ -97,15 +97,15 @@ public void call() { _onCompleted(); } - }, timeInMilliseconds, TimeUnit.MILLISECONDS); + }, delayTime, TimeUnit.MILLISECONDS); } /** - * Schedule a call to {@code onError} at relative time of "now()" on TestScheduler. + * Schedule a call to {@code onError} on TestScheduler. */ @Override public void onError(final Throwable e) { - onError(e, innerScheduler.now()); + onError(e, 0); } private void _onError(final Throwable e) { @@ -121,10 +121,10 @@ private void _onError(final Throwable e) { * * @param e * the {@code Throwable} to pass to the {@code onError} method - * @param timeInMilliseconds + * @param dalayTime * the number of milliseconds in the future relative to "now()" at which to call {@code onError} */ - public void onError(final Throwable e, long timeInMilliseconds) { + public void onError(final Throwable e, long dalayTime) { innerScheduler.schedule(new Action0() { @Override @@ -132,15 +132,15 @@ public void call() { _onError(e); } - }, timeInMilliseconds, TimeUnit.MILLISECONDS); + }, dalayTime, TimeUnit.MILLISECONDS); } /** - * Schedule a call to {@code onNext} at relative time of "now()" on TestScheduler. + * Schedule a call to {@code onNext} on TestScheduler. */ @Override public void onNext(T v) { - onNext(v, innerScheduler.now()); + onNext(v, 0); } private void _onNext(T v) { @@ -154,10 +154,10 @@ private void _onNext(T v) { * * @param v * the item to emit - * @param timeInMilliseconds + * @param delayTime * the number of milliseconds in the future relative to "now()" at which to call {@code onNext} */ - public void onNext(final T v, long timeInMilliseconds) { + public void onNext(final T v, long delayTime) { innerScheduler.schedule(new Action0() { @Override @@ -165,7 +165,7 @@ public void call() { _onNext(v); } - }, timeInMilliseconds, TimeUnit.MILLISECONDS); + }, delayTime, TimeUnit.MILLISECONDS); } @Override diff --git a/src/test/java/rx/subjects/TestSubjectTest.java b/src/test/java/rx/subjects/TestSubjectTest.java new file mode 100644 index 0000000000..dcd54b51c6 --- /dev/null +++ b/src/test/java/rx/subjects/TestSubjectTest.java @@ -0,0 +1,127 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subjects; + +import org.junit.Test; +import rx.Observer; +import rx.schedulers.TestScheduler; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.*; + +public class TestSubjectTest { + + @Test + public void testObserverPropagateValueAfterTriggeringActions() { + final TestScheduler scheduler = new TestScheduler(); + + final TestSubject subject = TestSubject.create(scheduler); + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + subject.subscribe(observer); + + subject.onNext(1); + scheduler.triggerActions(); + + verify(observer, times(1)).onNext(1); + } + + @Test + public void testObserverPropagateValueInFutureTimeAfterTriggeringActions() { + final TestScheduler scheduler = new TestScheduler(); + scheduler.advanceTimeTo(100, TimeUnit.SECONDS); + + final TestSubject subject = TestSubject.create(scheduler); + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + subject.subscribe(observer); + + subject.onNext(1); + scheduler.triggerActions(); + + verify(observer, times(1)).onNext(1); + } + + + + @Test + public void testObserverPropagateErrorAfterTriggeringActions() { + final IOException e = new IOException(); + final TestScheduler scheduler = new TestScheduler(); + + final TestSubject subject = TestSubject.create(scheduler); + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + subject.subscribe(observer); + + subject.onError(e); + scheduler.triggerActions(); + + verify(observer, times(1)).onError(e); + } + + @Test + public void testObserverPropagateErrorInFutureTimeAfterTriggeringActions() { + final IOException e = new IOException(); + final TestScheduler scheduler = new TestScheduler(); + scheduler.advanceTimeTo(100, TimeUnit.SECONDS); + + final TestSubject subject = TestSubject.create(scheduler); + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + subject.subscribe(observer); + + subject.onError(e); + scheduler.triggerActions(); + + verify(observer, times(1)).onError(e); + } + + + + @Test + public void testObserverPropagateCompletedAfterTriggeringActions() { + final TestScheduler scheduler = new TestScheduler(); + + final TestSubject subject = TestSubject.create(scheduler); + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + subject.subscribe(observer); + + subject.onCompleted(); + scheduler.triggerActions(); + + verify(observer, times(1)).onCompleted(); + } + + @Test + public void testObserverPropagateCompletedInFutureTimeAfterTriggeringActions() { + final TestScheduler scheduler = new TestScheduler(); + scheduler.advanceTimeTo(100, TimeUnit.SECONDS); + + final TestSubject subject = TestSubject.create(scheduler); + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + subject.subscribe(observer); + + subject.onCompleted(); + scheduler.triggerActions(); + + verify(observer, times(1)).onCompleted(); + } +}