Skip to content

Commit 8f03b8e

Browse files
committed
test with observeOn
1 parent d86bc3b commit 8f03b8e

File tree

1 file changed

+42
-0
lines changed

1 file changed

+42
-0
lines changed

src/test/java/rx/internal/operators/OperatorValveTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,21 @@
1919
import static org.mockito.Mockito.inOrder;
2020
import static org.mockito.Mockito.mock;
2121

22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
2225
import org.junit.Test;
2326
import org.mockito.ArgumentCaptor;
2427
import org.mockito.InOrder;
2528

2629
import rx.Observable;
2730
import rx.Producer;
31+
import rx.Scheduler;
2832
import rx.Subscriber;
2933
import rx.functions.Action1;
34+
import rx.functions.Func1;
3035
import rx.observers.TestSubscriber;
36+
import rx.schedulers.Schedulers;
3137
import rx.subjects.PublishSubject;
3238

3339
public class OperatorValveTest {
@@ -131,6 +137,14 @@ public void test() {
131137
order.verifyNoMoreInteractions();
132138
}
133139

140+
@Test
141+
public void testRequestError() {
142+
TestSubscriber<Void> tSub = TestSubscriber.create(-1);
143+
Exception e = new Exception();
144+
Observable.<Void> error(e).pressureValve(Observable.<Boolean> never(), 10).subscribe(tSub);
145+
tSub.assertError(e);
146+
}
147+
134148
@Test
135149
public void testDataError() {
136150
TestSubscriber<Void> tSub = TestSubscriber.create();
@@ -160,4 +174,32 @@ public void testControlCompleteClosed() {
160174
Observable.<Void> never().pressureValve(Observable.just(false), 10).subscribe(tSub);
161175
tSub.assertError(IllegalStateException.class);
162176
}
177+
178+
/*
179+
@Test
180+
public void testObserveOn() {
181+
final AtomicLong counter = new AtomicLong();
182+
Observable<Integer> range = Observable.range(0, Integer.MAX_VALUE);
183+
Observable<Boolean> control = Observable.interval(1, TimeUnit.SECONDS).map(new Func1<Long, Boolean>() {
184+
@Override
185+
public Boolean call(Long i) {
186+
System.out.println();
187+
counter.set(0);
188+
return i % 2 == 1;
189+
}
190+
});
191+
long granularity = 10;
192+
TestSubscriber<Integer> tSub = new TestSubscriber<Integer>();
193+
range.pressureValve(control, granularity).observeOn(Schedulers.computation()).toBlocking().forEach(new Action1<Integer>() {
194+
@Override
195+
public void call(Integer t) {
196+
System.out.print(counter.incrementAndGet()+ " \r");
197+
}
198+
});
199+
}
200+
201+
public static void main(String[] args) {
202+
new OperatorValveTest().testObserveOn();
203+
}
204+
*/
163205
}

0 commit comments

Comments
 (0)