diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java
index 2d332e5de8..4d5ac57232 100644
--- a/src/main/java/rx/Single.java
+++ b/src/main/java/rx/Single.java
@@ -37,6 +37,8 @@
import rx.annotations.Beta;
import rx.internal.operators.*;
import rx.internal.producers.SingleDelayedProducer;
+import rx.internal.util.ScalarSynchronousSingle;
+import rx.internal.util.UtilityFunctions;
import rx.singles.BlockingSingle;
import rx.observers.SafeSubscriber;
import rx.plugins.*;
@@ -656,15 +658,7 @@ public void call(SingleSubscriber super T> singleSubscriber) {
* @see ReactiveX operators documentation: Just
*/
public final static Single just(final T value) {
- // TODO add similar optimization as ScalarSynchronousObservable
- return Single.create(new OnSubscribe() {
-
- @Override
- public void call(SingleSubscriber super T> te) {
- te.onSuccess(value);
- }
-
- });
+ return ScalarSynchronousSingle.create(value);
}
/**
@@ -685,6 +679,9 @@ public void call(SingleSubscriber super T> te) {
* @see ReactiveX operators documentation: Merge
*/
public final static Single merge(final Single extends Single extends T>> source) {
+ if (source.getClass() == ScalarSynchronousSingle.class) {
+ return ((ScalarSynchronousSingle)source).scalarFlatMap((Func1) UtilityFunctions.identity());
+ }
return Single.create(new OnSubscribe() {
@Override
@@ -1258,6 +1255,9 @@ public final Observable concatWith(Single extends T> t1) {
* @see ReactiveX operators documentation: FlatMap
*/
public final Single flatMap(final Func1 super T, ? extends Single extends R>> func) {
+ if (getClass() == ScalarSynchronousSingle.class) {
+ return ((ScalarSynchronousSingle)this).scalarFlatMap(func);
+ }
return merge(map(func));
}
@@ -1340,6 +1340,9 @@ public final Observable mergeWith(Single extends T> t1) {
* @see #subscribeOn
*/
public final Single observeOn(Scheduler scheduler) {
+ if (this instanceof ScalarSynchronousSingle) {
+ return ((ScalarSynchronousSingle)this).scalarScheduleOn(scheduler);
+ }
return lift(new OperatorObserveOn(scheduler));
}
@@ -1699,6 +1702,9 @@ public void onNext(T t) {
* @see #observeOn
*/
public final Single subscribeOn(Scheduler scheduler) {
+ if (this instanceof ScalarSynchronousSingle) {
+ return ((ScalarSynchronousSingle)this).scalarScheduleOn(scheduler);
+ }
return nest().lift(new OperatorSubscribeOn(scheduler));
}
diff --git a/src/main/java/rx/internal/util/ScalarSynchronousSingle.java b/src/main/java/rx/internal/util/ScalarSynchronousSingle.java
new file mode 100644
index 0000000000..55650d84c6
--- /dev/null
+++ b/src/main/java/rx/internal/util/ScalarSynchronousSingle.java
@@ -0,0 +1,156 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.internal.util;
+
+import rx.Scheduler;
+import rx.Scheduler.Worker;
+import rx.Single;
+import rx.SingleSubscriber;
+import rx.Subscriber;
+import rx.functions.Action0;
+import rx.functions.Func1;
+import rx.internal.schedulers.EventLoopsScheduler;
+
+public final class ScalarSynchronousSingle extends Single {
+
+ public static final ScalarSynchronousSingle create(T t) {
+ return new ScalarSynchronousSingle(t);
+ }
+
+ final T t;
+
+ protected ScalarSynchronousSingle(final T t) {
+ super(new OnSubscribe() {
+
+ @Override
+ public void call(SingleSubscriber super T> te) {
+ te.onSuccess(t);
+ }
+
+ });
+ this.t = t;
+ }
+
+ public T get() {
+ return t;
+ }
+
+ /**
+ * Customized observeOn/subscribeOn implementation which emits the scalar
+ * value directly or with less overhead on the specified scheduler.
+ *
+ * @param scheduler the target scheduler
+ * @return the new observable
+ */
+ public Single scalarScheduleOn(Scheduler scheduler) {
+ if (scheduler instanceof EventLoopsScheduler) {
+ EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
+ return create(new DirectScheduledEmission(es, t));
+ }
+ return create(new NormalScheduledEmission(scheduler, t));
+ }
+
+ /**
+ * Optimized observeOn for scalar value observed on the EventLoopsScheduler.
+ */
+ static final class DirectScheduledEmission implements OnSubscribe {
+ private final EventLoopsScheduler es;
+ private final T value;
+
+ DirectScheduledEmission(EventLoopsScheduler es, T value) {
+ this.es = es;
+ this.value = value;
+ }
+
+ @Override
+ public void call(SingleSubscriber super T> singleSubscriber) {
+ singleSubscriber.add(es.scheduleDirect(new ScalarSynchronousSingleAction(singleSubscriber, value)));
+ }
+ }
+
+ /**
+ * Emits a scalar value on a general scheduler.
+ */
+ static final class NormalScheduledEmission implements OnSubscribe {
+ private final Scheduler scheduler;
+ private final T value;
+
+ NormalScheduledEmission(Scheduler scheduler, T value) {
+ this.scheduler = scheduler;
+ this.value = value;
+ }
+
+ @Override
+ public void call(SingleSubscriber super T> singleSubscriber) {
+ Worker worker = scheduler.createWorker();
+ singleSubscriber.add(worker);
+ worker.schedule(new ScalarSynchronousSingleAction(singleSubscriber, value));
+ }
+ }
+
+ /**
+ * Action that emits a single value when called.
+ */
+ static final class ScalarSynchronousSingleAction implements Action0 {
+ private final SingleSubscriber super T> subscriber;
+ private final T value;
+
+ ScalarSynchronousSingleAction(SingleSubscriber super T> subscriber,
+ T value) {
+ this.subscriber = subscriber;
+ this.value = value;
+ }
+
+ @Override
+ public void call() {
+ try {
+ subscriber.onSuccess(value);
+ } catch (Throwable t) {
+ subscriber.onError(t);
+ return;
+ }
+ }
+ }
+
+ public Single scalarFlatMap(final Func1 super T, ? extends Single extends R>> func) {
+ return create(new OnSubscribe() {
+ @Override
+ public void call(final SingleSubscriber super R> child) {
+
+ Single extends R> o = func.call(t);
+ if (o.getClass() == ScalarSynchronousSingle.class) {
+ child.onSuccess(((ScalarSynchronousSingle extends R>) o).t);
+ } else {
+ o.unsafeSubscribe(new Subscriber() {
+ @Override
+ public void onCompleted() {
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ child.onError(e);
+ }
+
+ @Override
+ public void onNext(R r) {
+ child.onSuccess(r);
+ }
+ });
+ }
+ }
+ });
+ }
+}