File tree Expand file tree Collapse file tree 1 file changed +4
-1
lines changed
src/perf/java/rx/operators Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Original file line number Diff line number Diff line change 28
28
import rx .Observable ;
29
29
import rx .functions .Func1 ;
30
30
import rx .jmh .InputWithIncrementingInteger ;
31
+ import rx .jmh .LatchedObserver ;
31
32
import rx .schedulers .Schedulers ;
32
33
33
34
@ BenchmarkMode (Mode .Throughput )
@@ -62,14 +63,16 @@ public Observable<Integer> call(Integer i) {
62
63
63
64
@ Benchmark
64
65
public void flatMapIntPassthruAsync (Input input ) throws InterruptedException {
66
+ LatchedObserver <Integer > latchedObserver = input .newLatchedObserver ();
65
67
input .observable .flatMap (new Func1 <Integer , Observable <Integer >>() {
66
68
67
69
@ Override
68
70
public Observable <Integer > call (Integer i ) {
69
71
return Observable .just (i ).subscribeOn (Schedulers .computation ());
70
72
}
71
73
72
- }).subscribe (input .observer );
74
+ }).subscribe (latchedObserver );
75
+ latchedObserver .latch .await ();
73
76
}
74
77
75
78
@ Benchmark
You can’t perform that action at this time.
0 commit comments