diff --git a/src/jmh/java/io/reactivex/FlattenRangePerf.java b/src/jmh/java/io/reactivex/FlattenRangePerf.java new file mode 100644 index 0000000000..1d2fe51546 --- /dev/null +++ b/src/jmh/java/io/reactivex/FlattenRangePerf.java @@ -0,0 +1,69 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlattenRangePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int times; + + Flowable flowable; + + Observable observable; + + @Setup + public void setup() { + Integer[] array = new Integer[times]; + Arrays.fill(array, 777); + + final Iterable list = Arrays.asList(1, 2); + + flowable = Flowable.fromArray(array).flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return list; + } + }); + + observable = Observable.fromArray(array).flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return list; + } + }); + } + + @Benchmark + public void flowable(Blackhole bh) { + flowable.subscribe(new PerfConsumer(bh)); + } + + @Benchmark + public void observable(Blackhole bh) { + observable.subscribe(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/FlowableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/FlowableFlatMapCompletableAsyncPerf.java similarity index 97% rename from src/jmh/java/io/reactivex/FlowableFlatMapCompletablePerf.java rename to src/jmh/java/io/reactivex/FlowableFlatMapCompletableAsyncPerf.java index ec846d35eb..6dd4dcd263 100644 --- a/src/jmh/java/io/reactivex/FlowableFlatMapCompletablePerf.java +++ b/src/jmh/java/io/reactivex/FlowableFlatMapCompletableAsyncPerf.java @@ -29,7 +29,7 @@ @OutputTimeUnit(TimeUnit.SECONDS) @Fork(value = 1) @State(Scope.Thread) -public class FlowableFlatMapCompletablePerf implements Action { +public class FlowableFlatMapCompletableAsyncPerf implements Action { @Param({"1", "10", "100", "1000", "10000", "100000", "1000000"}) int items; diff --git a/src/jmh/java/io/reactivex/MemoryPerf.java b/src/jmh/java/io/reactivex/MemoryPerf.java new file mode 100644 index 0000000000..ba92a89b78 --- /dev/null +++ b/src/jmh/java/io/reactivex/MemoryPerf.java @@ -0,0 +1,517 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.Callable; + +import org.reactivestreams.Subscription; + +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.*; + +/** + * Measure various prepared flows about their memory usage and print the result + * in a JMH compatible format; run {@link #main(String[])}. + */ +public final class MemoryPerf { + + private MemoryPerf() { } + + static long memoryUse() { + return ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + } + + static final class MyRx2Subscriber implements FlowableSubscriber { + + org.reactivestreams.Subscription s; + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + } + + @Override + public void onComplete() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Object t) { + + } + } + + static final class MyRx2Observer implements io.reactivex.Observer, io.reactivex.SingleObserver, + io.reactivex.MaybeObserver, io.reactivex.CompletableObserver { + + Disposable s; + + @Override + public void onSubscribe(Disposable s) { + this.s = s; + } + + @Override + public void onComplete() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Object t) { + + } + + @Override + public void onSuccess(Object value) { + + } + } + static void checkMemory(Callable item, String name, String typeLib) throws Exception { + checkMemory(item, name, typeLib, 1000000); + } + + static void checkMemory(Callable item, String name, String typeLib, int n) throws Exception { + // make sure classes are initialized + item.call(); + + Object[] array = new Object[n]; + + Thread.sleep(100); + System.gc(); + Thread.sleep(100); + + long before = memoryUse(); + + for (int i = 0; i < n; i++) { + array[i] = item.call(); + } + + Thread.sleep(100); + System.gc(); + Thread.sleep(100); + + long after = memoryUse(); + + double use = Math.max(0.0, (after - before) / 1024.0 / 1024.0); + + System.out.print(name); + System.out.print(" "); + System.out.print(typeLib); + System.out.print(" thrpt "); + System.out.print(n); + System.out.printf(" %.3f 0.000 MB%n", use); + + if (array.hashCode() == 1) { + System.out.print(""); + } + + array = null; + item = null; + + Thread.sleep(100); + System.gc(); + Thread.sleep(100); + } + + public static void main(String[] args) throws Exception { + + System.out.println("Benchmark (lib-type) Mode Cnt Score Error Units"); + + // --------------------------------------------------------------------------------------------------------------------- + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.just(1); + } + }, "just", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.range(1, 10); + } + }, "range", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.empty(); + } + }, "empty", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }); + } + }, "fromCallable", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return new MyRx2Observer(); + } + }, "consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return new io.reactivex.observers.TestObserver(); + } + }, "test-consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.just(1).subscribeWith(new MyRx2Observer()); + } + }, "just+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.range(1, 10).subscribeWith(new MyRx2Observer()); + } + }, "range+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.range(1, 10).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return v; + } + }).subscribeWith(new MyRx2Observer()); + } + }, "range+map+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.range(1, 10).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return v; + } + }).filter(new Predicate() { + @Override + public boolean test(Object v) throws Exception { + return true; + } + }).subscribeWith(new MyRx2Observer()); + } + }, "range+map+filter+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.range(1, 10).subscribeOn(io.reactivex.schedulers.Schedulers.computation()).subscribeWith(new MyRx2Observer()); + } + }, "range+subscribeOn+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.range(1, 10).observeOn(io.reactivex.schedulers.Schedulers.computation()).subscribeWith(new MyRx2Observer()); + } + }, "range+observeOn+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Observable.range(1, 10).subscribeOn(io.reactivex.schedulers.Schedulers.computation()).observeOn(io.reactivex.schedulers.Schedulers.computation()).subscribeWith(new MyRx2Observer()); + } + }, "range+subscribeOn+observeOn+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.AsyncSubject.create(); + } + }, "Async", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.PublishSubject.create(); + } + }, "Publish", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.ReplaySubject.create(); + } + }, "Replay", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.BehaviorSubject.create(); + } + }, "Behavior", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.UnicastSubject.create(); + } + }, "Unicast", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.AsyncSubject.create().subscribeWith(new MyRx2Observer()); + } + }, "Async+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.PublishSubject.create().subscribeWith(new MyRx2Observer()); + } + }, "Publish+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.ReplaySubject.create().subscribeWith(new MyRx2Observer()); + } + }, "Replay+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.BehaviorSubject.create().subscribeWith(new MyRx2Observer()); + } + }, "Behavior+consumer", "Rx2Observable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.subjects.UnicastSubject.create().subscribeWith(new MyRx2Observer()); + } + }, "Unicast+consumer", "Rx2Observable"); + + // --------------------------------------------------------------------------------------------------------------------- + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.just(1); + } + }, "just", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.range(1, 10); + } + }, "range", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.empty(); + } + }, "empty", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.empty(); + } + }, "empty", "Rx2Flowable", 10000000); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return 1; + } + }); + } + }, "fromCallable", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return new MyRx2Subscriber(); + } + }, "consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return new io.reactivex.observers.TestObserver(); + } + }, "test-consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.just(1).subscribeWith(new MyRx2Subscriber()); + } + }, "just+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.range(1, 10).subscribeWith(new MyRx2Subscriber()); + } + }, "range+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.range(1, 10).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return v; + } + }).subscribeWith(new MyRx2Subscriber()); + } + }, "range+map+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.range(1, 10).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return v; + } + }).filter(new Predicate() { + @Override + public boolean test(Object v) throws Exception { + return true; + } + }).subscribeWith(new MyRx2Subscriber()); + } + }, "range+map+filter+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.range(1, 10).subscribeOn(io.reactivex.schedulers.Schedulers.computation()).subscribeWith(new MyRx2Subscriber()); + } + }, "range+subscribeOn+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.range(1, 10).observeOn(io.reactivex.schedulers.Schedulers.computation()).subscribeWith(new MyRx2Subscriber()); + } + }, "range+observeOn+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.Flowable.range(1, 10).subscribeOn(io.reactivex.schedulers.Schedulers.computation()).observeOn(io.reactivex.schedulers.Schedulers.computation()).subscribeWith(new MyRx2Subscriber()); + } + }, "range+subscribeOn+observeOn+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.AsyncProcessor.create(); + } + }, "Async", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.PublishProcessor.create(); + } + }, "Publish", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.ReplayProcessor.create(); + } + }, "Replay", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.BehaviorProcessor.create(); + } + }, "Behavior", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.UnicastProcessor.create(); + } + }, "Unicast", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.AsyncProcessor.create().subscribeWith(new MyRx2Subscriber()); + } + }, "Async+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.PublishProcessor.create().subscribeWith(new MyRx2Subscriber()); + } + }, "Publish+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.ReplayProcessor.create().subscribeWith(new MyRx2Subscriber()); + } + }, "Replay+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.BehaviorProcessor.create().subscribeWith(new MyRx2Subscriber()); + } + }, "Behavior+consumer", "Rx2Flowable"); + + checkMemory(new Callable() { + @Override + public Object call() throws Exception { + return io.reactivex.processors.UnicastProcessor.create().subscribeWith(new MyRx2Subscriber()); + } + }, "Unicast+consumer", "Rx2Flowable"); + + // --------------------------------------------------------------------------------------------------------------------- + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapCompletablePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapCompletablePerf.java new file mode 100644 index 0000000000..ab23c7e252 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapCompletablePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableConcatMapCompletablePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Completable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.empty(); + } + }); + + flowableConvert = source.concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Completable.complete().toFlowable(); + } + }); + + flowableDedicated = source.concatMapCompletable(new Function() { + @Override + public Completable apply(Integer v) + throws Exception { + return Completable.complete(); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapMaybeEmptyPerf.java new file mode 100644 index 0000000000..b439425bd0 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapMaybeEmptyPerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableConcatMapMaybeEmptyPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable concatMapToFlowableEmpty; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.empty(); + } + }); + + concatMapToFlowableEmpty = source.concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Maybe.empty().toFlowable(); + } + }); + + flowableDedicated = source.concatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return concatMapToFlowableEmpty.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapMaybePerf.java new file mode 100644 index 0000000000..7f891292d2 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapMaybePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableConcatMapMaybePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.just(v); + } + }); + + flowableConvert = source.concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Maybe.just(v).toFlowable(); + } + }); + + flowableDedicated = source.concatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapSinglePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapSinglePerf.java new file mode 100644 index 0000000000..69f0b314fe --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableConcatMapSinglePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableConcatMapSinglePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.just(v); + } + }); + + flowableConvert = source.concatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Single.just(v).toFlowable(); + } + }); + + flowableDedicated = source.concatMapSingle(new Function>() { + @Override + public Single apply(Integer v) + throws Exception { + return Single.just(v); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapCompletablePerf.java new file mode 100644 index 0000000000..4b9f37664a --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapCompletablePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableFlatMapCompletablePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Completable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.empty(); + } + }); + + flowableConvert = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Completable.complete().toFlowable(); + } + }); + + flowableDedicated = source.flatMapCompletable(new Function() { + @Override + public Completable apply(Integer v) + throws Exception { + return Completable.complete(); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapMaybeEmptyPerf.java new file mode 100644 index 0000000000..bf2ad4396e --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapMaybeEmptyPerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableFlatMapMaybeEmptyPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.empty(); + } + }); + + flowableConvert = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Maybe.empty().toFlowable(); + } + }); + + flowableDedicated = source.flatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapMaybePerf.java new file mode 100644 index 0000000000..37c970d6cc --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapMaybePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableFlatMapMaybePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.just(v); + } + }); + + flowableConvert = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Maybe.just(v).toFlowable(); + } + }); + + flowableDedicated = source.flatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapSinglePerf.java new file mode 100644 index 0000000000..e4405932f8 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableFlatMapSinglePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableFlatMapSinglePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.just(v); + } + }); + + flowableConvert = source.flatMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Single.just(v).toFlowable(); + } + }); + + flowableDedicated = source.flatMapSingle(new Function>() { + @Override + public Single apply(Integer v) + throws Exception { + return Single.just(v); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapCompletablePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapCompletablePerf.java new file mode 100644 index 0000000000..d0c3041101 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapCompletablePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableSwitchMapCompletablePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Completable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.empty(); + } + }); + + flowableConvert = source.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Completable.complete().toFlowable(); + } + }); + + flowableDedicated = source.switchMapCompletable(new Function() { + @Override + public Completable apply(Integer v) + throws Exception { + return Completable.complete(); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapMaybeEmptyPerf.java new file mode 100644 index 0000000000..217d7bdeba --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapMaybeEmptyPerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableSwitchMapMaybeEmptyPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.empty(); + } + }); + + flowableConvert = source.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Maybe.empty().toFlowable(); + } + }); + + flowableDedicated = source.switchMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapMaybePerf.java new file mode 100644 index 0000000000..f00690b313 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapMaybePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableSwitchMapMaybePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.just(v); + } + }); + + flowableConvert = source.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Maybe.just(v).toFlowable(); + } + }); + + flowableDedicated = source.switchMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapSinglePerf.java new file mode 100644 index 0000000000..ad5b3209f0 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/FlowableSwitchMapSinglePerf.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableSwitchMapSinglePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Flowable flowableConvert; + + Flowable flowableDedicated; + + Flowable flowablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Flowable source = Flowable.fromArray(sourceArray); + + flowablePlain = source.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Flowable.just(v); + } + }); + + flowableConvert = source.switchMap(new Function>() { + @Override + public Publisher apply(Integer v) + throws Exception { + return Single.just(v).toFlowable(); + } + }); + + flowableDedicated = source.switchMapSingle(new Function>() { + @Override + public Single apply(Integer v) + throws Exception { + return Single.just(v); + } + }); + } + + @Benchmark + public Object flowablePlain(Blackhole bh) { + return flowablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableConvert(Blackhole bh) { + return flowableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flowableDedicated(Blackhole bh) { + return flowableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapCompletablePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapCompletablePerf.java new file mode 100644 index 0000000000..18382f6d55 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapCompletablePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableConcatMapCompletablePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Completable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.concatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.empty(); + } + }); + + observableConvert = source.concatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Completable.complete().toObservable(); + } + }); + + observableDedicated = source.concatMapCompletable(new Function() { + @Override + public Completable apply(Integer v) + throws Exception { + return Completable.complete(); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapMaybeEmptyPerf.java new file mode 100644 index 0000000000..bc5752eeff --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapMaybeEmptyPerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableConcatMapMaybeEmptyPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable concatMapToObservableEmpty; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.concatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.empty(); + } + }); + + concatMapToObservableEmpty = source.concatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Maybe.empty().toObservable(); + } + }); + + observableDedicated = source.concatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return concatMapToObservableEmpty.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapMaybePerf.java new file mode 100644 index 0000000000..e85e2708e4 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapMaybePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableConcatMapMaybePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.concatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.just(v); + } + }); + + observableConvert = source.concatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Maybe.just(v).toObservable(); + } + }); + + observableDedicated = source.concatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapSinglePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapSinglePerf.java new file mode 100644 index 0000000000..090e1fde96 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableConcatMapSinglePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableConcatMapSinglePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.concatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.just(v); + } + }); + + observableConvert = source.concatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Single.just(v).toObservable(); + } + }); + + observableDedicated = source.concatMapSingle(new Function>() { + @Override + public Single apply(Integer v) + throws Exception { + return Single.just(v); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapCompletablePerf.java new file mode 100644 index 0000000000..10bbed676e --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapCompletablePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableFlatMapCompletablePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Completable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.flatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.empty(); + } + }); + + observableConvert = source.flatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Completable.complete().toObservable(); + } + }); + + observableDedicated = source.flatMapCompletable(new Function() { + @Override + public Completable apply(Integer v) + throws Exception { + return Completable.complete(); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapMaybeEmptyPerf.java new file mode 100644 index 0000000000..021ae89332 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapMaybeEmptyPerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableFlatMapMaybeEmptyPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.flatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.empty(); + } + }); + + observableConvert = source.flatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Maybe.empty().toObservable(); + } + }); + + observableDedicated = source.flatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapMaybePerf.java new file mode 100644 index 0000000000..2c1eaccb00 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapMaybePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableFlatMapMaybePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.flatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.just(v); + } + }); + + observableConvert = source.flatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Maybe.just(v).toObservable(); + } + }); + + observableDedicated = source.flatMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapSinglePerf.java new file mode 100644 index 0000000000..46cd554792 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableFlatMapSinglePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableFlatMapSinglePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.flatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.just(v); + } + }); + + observableConvert = source.flatMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Single.just(v).toObservable(); + } + }); + + observableDedicated = source.flatMapSingle(new Function>() { + @Override + public Single apply(Integer v) + throws Exception { + return Single.just(v); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapCompletablePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapCompletablePerf.java new file mode 100644 index 0000000000..e5a6c0336f --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapCompletablePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableSwitchMapCompletablePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable switchMapToObservableEmpty; + + Completable switchMapCompletableEmpty; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.switchMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.empty(); + } + }); + + switchMapToObservableEmpty = source.switchMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Completable.complete().toObservable(); + } + }); + + switchMapCompletableEmpty = source.switchMapCompletable(new Function() { + @Override + public Completable apply(Integer v) + throws Exception { + return Completable.complete(); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object switchMapToObservableEmpty(Blackhole bh) { + return switchMapToObservableEmpty.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object switchMapCompletableEmpty(Blackhole bh) { + return switchMapCompletableEmpty.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapMaybeEmptyPerf.java new file mode 100644 index 0000000000..77657c69cb --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapMaybeEmptyPerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableSwitchMapMaybeEmptyPerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.switchMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.empty(); + } + }); + + observableConvert = source.switchMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Maybe.empty().toObservable(); + } + }); + + observableDedicated = source.switchMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.empty(); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapMaybePerf.java new file mode 100644 index 0000000000..5e80b84e36 --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapMaybePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableSwitchMapMaybePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.switchMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.just(v); + } + }); + + observableConvert = source.switchMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Maybe.just(v).toObservable(); + } + }); + + observableDedicated = source.switchMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) + throws Exception { + return Maybe.just(v); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapSinglePerf.java new file mode 100644 index 0000000000..fcca79aa8b --- /dev/null +++ b/src/jmh/java/io/reactivex/xmapz/ObservableSwitchMapSinglePerf.java @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.xmapz; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class ObservableSwitchMapSinglePerf { + @Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" }) + public int count; + + Observable observableConvert; + + Observable observableDedicated; + + Observable observablePlain; + + @Setup + public void setup() { + Integer[] sourceArray = new Integer[count]; + Arrays.fill(sourceArray, 777); + + Observable source = Observable.fromArray(sourceArray); + + observablePlain = source.switchMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Observable.just(v); + } + }); + + observableConvert = source.switchMap(new Function>() { + @Override + public Observable apply(Integer v) + throws Exception { + return Single.just(v).toObservable(); + } + }); + + observableDedicated = source.switchMapSingle(new Function>() { + @Override + public Single apply(Integer v) + throws Exception { + return Single.just(v); + } + }); + } + + @Benchmark + public Object observablePlain(Blackhole bh) { + return observablePlain.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableConvert(Blackhole bh) { + return observableConvert.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object observableDedicated(Blackhole bh) { + return observableDedicated.subscribeWith(new PerfConsumer(bh)); + } +}