diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 444f20a4d8..453916d5a6 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -1243,7 +1243,14 @@ public final static Observable from(Iterable iterable) { * @see ReactiveX operators documentation: From */ public final static Observable from(T[] array) { - return from(Arrays.asList(array)); + int n = array.length; + if (n == 0) { + return empty(); + } else + if (n == 1) { + return just(array[0]); + } + return create(new OnSubscribeFromArray(array)); } /** @@ -1423,7 +1430,7 @@ public final static Observable just(final T value) { // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2) { - return from(Arrays.asList(t1, t2)); + return from((T[])new Object[] { t1, t2 }); } /** @@ -1449,7 +1456,7 @@ public final static Observable just(T t1, T t2) { // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2, T t3) { - return from(Arrays.asList(t1, t2, t3)); + return from((T[])new Object[] { t1, t2, t3 }); } /** @@ -1477,7 +1484,7 @@ public final static Observable just(T t1, T t2, T t3) { // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2, T t3, T t4) { - return from(Arrays.asList(t1, t2, t3, t4)); + return from((T[])new Object[] { t1, t2, t3, t4 }); } /** @@ -1507,7 +1514,7 @@ public final static Observable just(T t1, T t2, T t3, T t4) { // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2, T t3, T t4, T t5) { - return from(Arrays.asList(t1, t2, t3, t4, t5)); + return from((T[])new Object[] { t1, t2, t3, t4, t5 }); } /** @@ -1539,7 +1546,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5) { // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6) { - return from(Arrays.asList(t1, t2, t3, t4, t5, t6)); + return from((T[])new Object[] { t1, t2, t3, t4, t5, t6 }); } /** @@ -1573,7 +1580,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6) { // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T t7) { - return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7)); + return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7 }); } /** @@ -1609,7 +1616,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) { - return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8)); + return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8 }); } /** @@ -1647,7 +1654,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) { - return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9)); + return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 }); } /** @@ -1687,7 +1694,7 @@ public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T // suppress unchecked because we are using varargs inside the method @SuppressWarnings("unchecked") public final static Observable just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) { - return from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10)); + return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9, t10 }); } /** @@ -1821,7 +1828,7 @@ public final static Observable merge(Observable Observable merge(Observable t1, Observable t2) { - return merge(from(Arrays.asList(t1, t2))); + return merge(new Observable[] { t1, t2 }); } /** @@ -1847,7 +1854,7 @@ public final static Observable merge(Observable t1, Observab */ @SuppressWarnings("unchecked") public final static Observable merge(Observable t1, Observable t2, Observable t3) { - return merge(from(Arrays.asList(t1, t2, t3))); + return merge(new Observable[] { t1, t2, t3 }); } /** @@ -1875,7 +1882,7 @@ public final static Observable merge(Observable t1, Observab */ @SuppressWarnings("unchecked") public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4) { - return merge(from(Arrays.asList(t1, t2, t3, t4))); + return merge(new Observable[] { t1, t2, t3, t4 }); } /** @@ -1905,7 +1912,7 @@ public final static Observable merge(Observable t1, Observab */ @SuppressWarnings("unchecked") public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5) { - return merge(from(Arrays.asList(t1, t2, t3, t4, t5))); + return merge(new Observable[] { t1, t2, t3, t4, t5 }); } /** @@ -1937,7 +1944,7 @@ public final static Observable merge(Observable t1, Observab */ @SuppressWarnings("unchecked") public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6) { - return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6))); + return merge(new Observable[] { t1, t2, t3, t4, t5, t6 }); } /** @@ -1971,7 +1978,7 @@ public final static Observable merge(Observable t1, Observab */ @SuppressWarnings("unchecked") public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7) { - return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7))); + return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7 }); } /** @@ -2007,7 +2014,7 @@ public final static Observable merge(Observable t1, Observab */ @SuppressWarnings("unchecked") public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8) { - return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8))); + return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8 }); } /** @@ -2045,7 +2052,7 @@ public final static Observable merge(Observable t1, Observab */ @SuppressWarnings("unchecked") public final static Observable merge(Observable t1, Observable t2, Observable t3, Observable t4, Observable t5, Observable t6, Observable t7, Observable t8, Observable t9) { - return merge(from(Arrays.asList(t1, t2, t3, t4, t5, t6, t7, t8, t9))); + return merge(new Observable[] { t1, t2, t3, t4, t5, t6, t7, t8, t9 }); } /** diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromArray.java b/src/main/java/rx/internal/operators/OnSubscribeFromArray.java new file mode 100644 index 0000000000..623dcaa65f --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeFromArray.java @@ -0,0 +1,128 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import java.util.concurrent.atomic.AtomicLong; + +import rx.*; +import rx.Observable.OnSubscribe; + +public final class OnSubscribeFromArray implements OnSubscribe { + final T[] array; + public OnSubscribeFromArray(T[] array) { + this.array = array; + } + + @Override + public void call(Subscriber child) { + child.setProducer(new FromArrayProducer(child, array)); + } + + static final class FromArrayProducer + extends AtomicLong + implements Producer { + /** */ + private static final long serialVersionUID = 3534218984725836979L; + + final Subscriber child; + final T[] array; + + int index; + + public FromArrayProducer(Subscriber child, T[] array) { + this.child = child; + this.array = array; + } + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + if (n == Long.MAX_VALUE) { + if (BackpressureUtils.getAndAddRequest(this, n) == 0) { + fastPath(); + } + } else + if (n != 0) { + if (BackpressureUtils.getAndAddRequest(this, n) == 0) { + slowPath(n); + } + } + } + + void fastPath() { + final Subscriber child = this.child; + + for (T t : array) { + if (child.isUnsubscribed()) { + return; + } + + child.onNext(t); + } + + if (child.isUnsubscribed()) { + return; + } + child.onCompleted(); + } + + void slowPath(long r) { + final Subscriber child = this.child; + final T[] array = this.array; + final int n = array.length; + + long e = 0L; + int i = index; + + for (;;) { + + while (r != 0L && i != n) { + if (child.isUnsubscribed()) { + return; + } + + child.onNext(array[i]); + + i++; + + if (i == n) { + if (!child.isUnsubscribed()) { + child.onCompleted(); + } + return; + } + + r--; + e--; + } + + r = get() + e; + + if (r == 0L) { + index = i; + r = addAndGet(e); + if (r == 0L) { + return; + } + e = 0L; + } + } + } + } +} diff --git a/src/perf/java/rx/operators/FromComparison.java b/src/perf/java/rx/operators/FromComparison.java new file mode 100644 index 0000000000..7a7a12545d --- /dev/null +++ b/src/perf/java/rx/operators/FromComparison.java @@ -0,0 +1,123 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import rx.*; +import rx.internal.operators.*; + +/** + * Benchmark typical atomic operations on volatile fields and AtomicXYZ classes. + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*FromComparison.*" + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*FromComparison.*" + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@State(Scope.Thread) +public class FromComparison { + @Param({ "1", "10", "100", "1000", "1000000" }) + public int times; + + Observable iterableSource; + + Observable arraySource; + + @Setup + public void setup() { + Integer[] array = new Integer[times]; + + Arrays.fill(array, 1); + + iterableSource = Observable.create(new OnSubscribeFromIterable(Arrays.asList(array))); + arraySource = Observable.create(new OnSubscribeFromArray(array)); + } + + @Benchmark + public void fastpathIterable(Blackhole bh) { + iterableSource.subscribe(new RequestingSubscriber(bh, Long.MAX_VALUE)); + } + + @Benchmark + public void fastpathArray(Blackhole bh) { + arraySource.subscribe(new RequestingSubscriber(bh, Long.MAX_VALUE)); + } + + @Benchmark + public void slowpathIterable(Blackhole bh) { + iterableSource.subscribe(new RequestingSubscriber(bh, times + 1)); + } + + @Benchmark + public void slowpathArray(Blackhole bh) { + arraySource.subscribe(new RequestingSubscriber(bh, times + 1)); + } + + @Benchmark + public void slowpathIterable2(Blackhole bh) { + iterableSource.subscribe(new RequestingSubscriber(bh, 128)); + } + + @Benchmark + public void slowpathArray2(Blackhole bh) { + arraySource.subscribe(new RequestingSubscriber(bh, 128)); + } + + + static final class RequestingSubscriber extends Subscriber { + final Blackhole bh; + final long limit; + long received; + Producer p; + + public RequestingSubscriber(Blackhole bh, long limit) { + this.bh = bh; + this.limit = limit; + } + + @Override + public void onNext(T t) { + bh.consume(t); + if (++received >= limit) { + received = 0L; + p.request(limit); + } + } + + @Override + public void onError(Throwable e) { + e.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + + @Override + public void setProducer(Producer p) { + this.p = p; + p.request(limit); + } + } +} diff --git a/src/test/java/rx/internal/operators/OnSubscribeFromArrayTest.java b/src/test/java/rx/internal/operators/OnSubscribeFromArrayTest.java new file mode 100644 index 0000000000..3b7ec5220b --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeFromArrayTest.java @@ -0,0 +1,67 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import org.junit.Test; + +import rx.Observable; +import rx.observers.TestSubscriber; + +public class OnSubscribeFromArrayTest { + + Observable create(int n) { + Integer[] array = new Integer[n]; + for (int i = 0; i < n; i++) { + array[i] = i; + } + return Observable.create(new OnSubscribeFromArray(array)); + } + @Test + public void simple() { + TestSubscriber ts = new TestSubscriber(); + + create(1000).subscribe(ts); + + ts.assertNoErrors(); + ts.assertValueCount(1000); + ts.assertCompleted(); + } + + @Test + public void backpressure() { + TestSubscriber ts = TestSubscriber.create(0); + + create(1000).subscribe(ts); + + ts.assertNoErrors(); + ts.assertNoValues(); + ts.assertNotCompleted(); + + ts.requestMore(10); + + ts.assertNoErrors(); + ts.assertValueCount(10); + ts.assertNotCompleted(); + + ts.requestMore(1000); + + ts.assertNoErrors(); + ts.assertValueCount(1000); + ts.assertCompleted(); + } + +}