diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java
index 5ffbee393b..4f72edddf7 100644
--- a/src/main/java/rx/Single.java
+++ b/src/main/java/rx/Single.java
@@ -37,6 +37,8 @@
import rx.annotations.Beta;
import rx.internal.operators.*;
import rx.internal.producers.SingleDelayedProducer;
+import rx.internal.util.ScalarSynchronousSingle;
+import rx.internal.util.UtilityFunctions;
import rx.singles.BlockingSingle;
import rx.observers.SafeSubscriber;
import rx.plugins.*;
@@ -654,15 +656,7 @@ public void call(SingleSubscriber super T> singleSubscriber) {
* @see ReactiveX operators documentation: Just
*/
public static Single just(final T value) {
- // TODO add similar optimization as ScalarSynchronousObservable
- return Single.create(new OnSubscribe() {
-
- @Override
- public void call(SingleSubscriber super T> te) {
- te.onSuccess(value);
- }
-
- });
+ return ScalarSynchronousSingle.create(value);
}
/**
@@ -683,6 +677,9 @@ public void call(SingleSubscriber super T> te) {
* @see ReactiveX operators documentation: Merge
*/
public static Single merge(final Single extends Single extends T>> source) {
+ if (source instanceof ScalarSynchronousSingle) {
+ return ((ScalarSynchronousSingle) source).scalarFlatMap((Func1) UtilityFunctions.identity());
+ }
return Single.create(new OnSubscribe() {
@Override
@@ -1296,6 +1293,9 @@ public final Observable concatWith(Single extends T> t1) {
* @see ReactiveX operators documentation: FlatMap
*/
public final Single flatMap(final Func1 super T, ? extends Single extends R>> func) {
+ if (this instanceof ScalarSynchronousSingle) {
+ return ((ScalarSynchronousSingle) this).scalarFlatMap(func);
+ }
return merge(map(func));
}
@@ -1378,6 +1378,9 @@ public final Observable mergeWith(Single extends T> t1) {
* @see #subscribeOn
*/
public final Single observeOn(Scheduler scheduler) {
+ if (this instanceof ScalarSynchronousSingle) {
+ return ((ScalarSynchronousSingle)this).scalarScheduleOn(scheduler);
+ }
return lift(new OperatorObserveOn(scheduler));
}
@@ -1737,6 +1740,9 @@ public void onNext(T t) {
* @see #observeOn
*/
public final Single subscribeOn(final Scheduler scheduler) {
+ if (this instanceof ScalarSynchronousSingle) {
+ return ((ScalarSynchronousSingle)this).scalarScheduleOn(scheduler);
+ }
return create(new OnSubscribe() {
@Override
public void call(final SingleSubscriber super T> t) {
@@ -1772,7 +1778,7 @@ public void onError(Throwable error) {
}
});
}
- });
+ });
}
/**
diff --git a/src/main/java/rx/internal/util/ScalarSynchronousSingle.java b/src/main/java/rx/internal/util/ScalarSynchronousSingle.java
new file mode 100644
index 0000000000..83b7d456a1
--- /dev/null
+++ b/src/main/java/rx/internal/util/ScalarSynchronousSingle.java
@@ -0,0 +1,157 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.internal.util;
+
+import rx.Scheduler;
+import rx.Scheduler.Worker;
+import rx.Single;
+import rx.SingleSubscriber;
+import rx.Subscriber;
+import rx.functions.Action0;
+import rx.functions.Func1;
+import rx.internal.schedulers.EventLoopsScheduler;
+
+public final class ScalarSynchronousSingle extends Single {
+
+ public static final ScalarSynchronousSingle create(T t) {
+ return new ScalarSynchronousSingle(t);
+ }
+
+ final T value;
+
+ protected ScalarSynchronousSingle(final T t) {
+ super(new OnSubscribe() {
+
+ @Override
+ public void call(SingleSubscriber super T> te) {
+ te.onSuccess(t);
+ }
+
+ });
+ this.value = t;
+ }
+
+ public T get() {
+ return value;
+ }
+
+ /**
+ * Customized observeOn/subscribeOn implementation which emits the scalar
+ * value directly or with less overhead on the specified scheduler.
+ *
+ * @param scheduler the target scheduler
+ * @return the new observable
+ */
+ public Single scalarScheduleOn(Scheduler scheduler) {
+ if (scheduler instanceof EventLoopsScheduler) {
+ EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
+ return create(new DirectScheduledEmission(es, value));
+ }
+ return create(new NormalScheduledEmission(scheduler, value));
+ }
+
+ /**
+ * Optimized observeOn for scalar value observed on the EventLoopsScheduler.
+ */
+ static final class DirectScheduledEmission implements OnSubscribe {
+ private final EventLoopsScheduler es;
+ private final T value;
+
+ DirectScheduledEmission(EventLoopsScheduler es, T value) {
+ this.es = es;
+ this.value = value;
+ }
+
+ @Override
+ public void call(SingleSubscriber super T> singleSubscriber) {
+ singleSubscriber.add(es.scheduleDirect(new ScalarSynchronousSingleAction(singleSubscriber, value)));
+ }
+ }
+
+ /**
+ * Emits a scalar value on a general scheduler.
+ */
+ static final class NormalScheduledEmission implements OnSubscribe {
+ private final Scheduler scheduler;
+ private final T value;
+
+ NormalScheduledEmission(Scheduler scheduler, T value) {
+ this.scheduler = scheduler;
+ this.value = value;
+ }
+
+ @Override
+ public void call(SingleSubscriber super T> singleSubscriber) {
+ Worker worker = scheduler.createWorker();
+ singleSubscriber.add(worker);
+ worker.schedule(new ScalarSynchronousSingleAction(singleSubscriber, value));
+ }
+ }
+
+ /**
+ * Action that emits a single value when called.
+ */
+ static final class ScalarSynchronousSingleAction implements Action0 {
+ private final SingleSubscriber super T> subscriber;
+ private final T value;
+
+ ScalarSynchronousSingleAction(SingleSubscriber super T> subscriber,
+ T value) {
+ this.subscriber = subscriber;
+ this.value = value;
+ }
+
+ @Override
+ public void call() {
+ try {
+ subscriber.onSuccess(value);
+ } catch (Throwable t) {
+ subscriber.onError(t);
+ }
+ }
+ }
+
+ public Single scalarFlatMap(final Func1 super T, ? extends Single extends R>> func) {
+ return create(new OnSubscribe() {
+ @Override
+ public void call(final SingleSubscriber super R> child) {
+
+ Single extends R> o = func.call(value);
+ if (o instanceof ScalarSynchronousSingle) {
+ child.onSuccess(((ScalarSynchronousSingle extends R>) o).value);
+ } else {
+ Subscriber subscriber = new Subscriber() {
+ @Override
+ public void onCompleted() {
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ child.onError(e);
+ }
+
+ @Override
+ public void onNext(R r) {
+ child.onSuccess(r);
+ }
+ };
+ child.add(subscriber);
+ o.unsafeSubscribe(subscriber);
+ }
+ }
+ });
+ }
+}
diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java
index b29fcb01af..488a2c0d52 100644
--- a/src/test/java/rx/SingleTest.java
+++ b/src/test/java/rx/SingleTest.java
@@ -1,11 +1,11 @@
/**
* Copyright 2015 Netflix, Inc.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
diff --git a/src/test/java/rx/internal/util/ScalarSynchronousSingleTest.java b/src/test/java/rx/internal/util/ScalarSynchronousSingleTest.java
new file mode 100644
index 0000000000..61700af4d1
--- /dev/null
+++ b/src/test/java/rx/internal/util/ScalarSynchronousSingleTest.java
@@ -0,0 +1,285 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rx.internal.util;
+
+import org.junit.Test;
+
+import rx.Single;
+import rx.SingleSubscriber;
+import rx.Subscription;
+import rx.exceptions.TestException;
+import rx.functions.Action0;
+import rx.functions.Func1;
+import rx.observers.TestSubscriber;
+import rx.schedulers.Schedulers;
+import rx.subscriptions.Subscriptions;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class ScalarSynchronousSingleTest {
+ @Test
+ public void backPressure() {
+ TestSubscriber ts = TestSubscriber.create(0);
+
+ Single.just(1).subscribe(ts);
+
+ ts.assertNoValues();
+ ts.assertNoErrors();
+ ts.assertNotCompleted();
+
+ ts.requestMore(1);
+
+ ts.assertValue(1);
+ ts.assertCompleted();
+ ts.assertNoErrors();
+
+ ts.requestMore(1);
+
+ ts.assertValue(1);
+ ts.assertCompleted();
+ ts.assertNoErrors();
+ }
+
+ @Test(timeout = 1000)
+ public void backPressureSubscribeOn() throws Exception {
+ TestSubscriber ts = TestSubscriber.create(0);
+
+ Single.just(1).subscribeOn(Schedulers.computation()).subscribe(ts);
+
+ Thread.sleep(200);
+
+ ts.assertNoValues();
+ ts.assertNoErrors();
+ ts.assertNotCompleted();
+
+ ts.requestMore(1);
+
+ ts.awaitTerminalEvent();
+
+ ts.assertValue(1);
+ ts.assertCompleted();
+ ts.assertNoErrors();
+ }
+
+ @Test(timeout = 1000)
+ public void backPressureObserveOn() throws Exception {
+ TestSubscriber ts = TestSubscriber.create(0);
+
+ Single.just(1).observeOn(Schedulers.computation()).subscribe(ts);
+
+ Thread.sleep(200);
+
+ ts.assertNoValues();
+ ts.assertNoErrors();
+ ts.assertNotCompleted();
+
+ ts.requestMore(1);
+
+ ts.awaitTerminalEvent();
+
+ ts.assertValue(1);
+ ts.assertCompleted();
+ ts.assertNoErrors();
+ }
+
+ @Test(timeout = 1000)
+ public void backPressureSubscribeOn2() throws Exception {
+ TestSubscriber ts = TestSubscriber.create(0);
+
+ Single.just(1).subscribeOn(Schedulers.newThread()).subscribe(ts);
+
+ Thread.sleep(200);
+
+ ts.assertNoValues();
+ ts.assertNoErrors();
+ ts.assertNotCompleted();
+
+ ts.requestMore(1);
+
+ ts.awaitTerminalEvent();
+
+ ts.assertValue(1);
+ ts.assertCompleted();
+ ts.assertNoErrors();
+ }
+
+ @Test(timeout = 1000)
+ public void backPressureObserveOn2() throws Exception {
+ TestSubscriber ts = TestSubscriber.create(0);
+
+ Single.just(1).observeOn(Schedulers.newThread()).subscribe(ts);
+
+ Thread.sleep(200);
+
+ ts.assertNoValues();
+ ts.assertNoErrors();
+ ts.assertNotCompleted();
+
+ ts.requestMore(1);
+
+ ts.awaitTerminalEvent();
+
+ ts.assertValue(1);
+ ts.assertCompleted();
+ ts.assertNoErrors();
+ }
+
+ @Test
+ public void backPressureFlatMapJust() {
+ TestSubscriber ts = TestSubscriber.create(0);
+
+ Single.just(1).flatMap(new Func1>() {
+ @Override
+ public Single call(Integer v) {
+ return Single.just(String.valueOf(v));
+ }
+ }).subscribe(ts);
+
+ ts.assertNoValues();
+ ts.assertNoErrors();
+ ts.assertNotCompleted();
+
+ ts.requestMore(1);
+
+ ts.assertValue("1");
+ ts.assertCompleted();
+ ts.assertNoErrors();
+
+ ts.requestMore(1);
+
+ ts.assertValue("1");
+ ts.assertCompleted();
+ ts.assertNoErrors();
+ }
+
+ @Test
+ public void syncObserverNextThrows() {
+ TestSubscriber ts = new TestSubscriber() {
+ @Override
+ public void onNext(Integer t) {
+ throw new TestException();
+ }
+ };
+
+ Single.just(1).unsafeSubscribe(ts);
+
+ ts.assertNoValues();
+ ts.assertError(TestException.class);
+ ts.assertNotCompleted();
+ }
+
+ @Test
+ public void syncFlatMapJustObserverNextThrows() {
+ TestSubscriber ts = new TestSubscriber() {
+ @Override
+ public void onNext(Integer t) {
+ throw new TestException();
+ }
+ };
+
+ Single.just(1)
+ .flatMap(new Func1>() {
+ @Override
+ public Single call(Integer v) {
+ return Single.just(v);
+ }
+ })
+ .unsafeSubscribe(ts);
+
+ ts.assertNoValues();
+ ts.assertError(TestException.class);
+ ts.assertNotCompleted();
+ }
+
+ @Test(timeout = 1000)
+ public void asyncObserverNextThrows() {
+ TestSubscriber ts = new TestSubscriber() {
+ @Override
+ public void onNext(Integer t) {
+ throw new TestException();
+ }
+ };
+
+ Single.just(1).subscribeOn(Schedulers.computation()).unsafeSubscribe(ts);
+
+ ts.awaitTerminalEvent();
+ ts.assertNoValues();
+ ts.assertError(TestException.class);
+ ts.assertNotCompleted();
+ }
+
+ @Test
+ public void scalarFlatMap() {
+ final Action0 unSubscribe = mock(Action0.class);
+ Single s = Single.create(new Single.OnSubscribe() {
+ @Override
+ public void call(SingleSubscriber super Integer> subscriber) {
+ subscriber.add(Subscriptions.create(unSubscribe));
+ }
+ });
+ Subscription subscription = Single.merge(Single.just(s)).subscribe();
+ subscription.unsubscribe();
+ verify(unSubscribe).call();
+ }
+
+ @Test
+ public void scalarFlatMapError() {
+ final Throwable error = new IllegalStateException();
+ Single s = Single.just(1);
+ TestSubscriber testSubscriber = new TestSubscriber();
+ s.flatMap(new Func1>() {
+ @Override
+ public Single call(Integer integer) {
+ return Single.create(new Single.OnSubscribe() {
+ @Override
+ public void call(SingleSubscriber super String> singleSubscriber) {
+ singleSubscriber.onError(error);
+ }
+ });
+ }
+ }).subscribe(testSubscriber);
+ testSubscriber.assertNoValues();
+ testSubscriber.assertError(error);
+ }
+
+ @Test
+ public void scalarFlatMapSuccess() {
+ Single s = Single.just(1);
+ TestSubscriber testSubscriber = new TestSubscriber();
+ s.flatMap(new Func1>() {
+ @Override
+ public Single call(final Integer integer) {
+ return Single.create(new Single.OnSubscribe() {
+ @Override
+ public void call(SingleSubscriber super String> singleSubscriber) {
+ singleSubscriber.onSuccess(String.valueOf(integer));
+ }
+ });
+ }
+ }).subscribe(testSubscriber);
+ testSubscriber.assertNoErrors();
+ testSubscriber.assertValue("1");
+ }
+
+ @Test
+ public void getValue() {
+ Single s = Single.just(1);
+ assertEquals(1, ((ScalarSynchronousSingle) s).get());
+ }
+}
\ No newline at end of file