diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 5978412fc7..6df23ff316 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5481,7 +5481,7 @@ public final Observable switchIfEmpty(Observable alternate) { if (alternate == null) { throw new NullPointerException("alternate is null"); } - return lift(new OperatorSwitchIfEmpty(alternate)); + return unsafeCreate(new OnSubscribeSwitchIfEmpty(this, alternate)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java b/src/main/java/rx/internal/operators/OnSubscribeSwitchIfEmpty.java similarity index 70% rename from src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java rename to src/main/java/rx/internal/operators/OnSubscribeSwitchIfEmpty.java index f3ead2c604..db02dfbff9 100644 --- a/src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java +++ b/src/main/java/rx/internal/operators/OnSubscribeSwitchIfEmpty.java @@ -16,6 +16,8 @@ package rx.internal.operators; +import java.util.concurrent.atomic.AtomicInteger; + import rx.*; import rx.internal.producers.ProducerArbiter; import rx.subscriptions.SerialSubscription; @@ -26,22 +28,28 @@ * empty, the results of the given Observable will be emitted. * @param the value type */ -public final class OperatorSwitchIfEmpty implements Observable.Operator { - private final Observable alternate; +public final class OnSubscribeSwitchIfEmpty implements Observable.OnSubscribe { + + final Observable source; - public OperatorSwitchIfEmpty(Observable alternate) { + final Observable alternate; + + public OnSubscribeSwitchIfEmpty(Observable source, Observable alternate) { + this.source = source; this.alternate = alternate; } @Override - public Subscriber call(Subscriber child) { + public void call(Subscriber child) { final SerialSubscription serial = new SerialSubscription(); ProducerArbiter arbiter = new ProducerArbiter(); final ParentSubscriber parent = new ParentSubscriber(child, serial, arbiter, alternate); + serial.set(parent); child.add(serial); child.setProducer(arbiter); - return parent; + + parent.subscribe(source); } static final class ParentSubscriber extends Subscriber { @@ -52,11 +60,15 @@ static final class ParentSubscriber extends Subscriber { private final ProducerArbiter arbiter; private final Observable alternate; + final AtomicInteger wip; + volatile boolean active; + ParentSubscriber(Subscriber child, final SerialSubscription serial, ProducerArbiter arbiter, Observable alternate) { this.child = child; this.serial = serial; this.arbiter = arbiter; this.alternate = alternate; + this.wip = new AtomicInteger(); } @Override @@ -69,14 +81,33 @@ public void onCompleted() { if (!empty) { child.onCompleted(); } else if (!child.isUnsubscribed()) { - subscribeToAlternate(); + active = false; + subscribe(null); } } - private void subscribeToAlternate() { - AlternateSubscriber as = new AlternateSubscriber(child, arbiter); - serial.set(as); - alternate.unsafeSubscribe(as); + void subscribe(Observable source) { + if (wip.getAndIncrement() == 0) { + do { + if (child.isUnsubscribed()) { + break; + } + + if (!active) { + if (source == null) { + AlternateSubscriber as = new AlternateSubscriber(child, arbiter); + serial.set(as); + active = true; + alternate.unsafeSubscribe(as); + } else { + active = true; + source.unsafeSubscribe(this); + source = null; + } + } + + } while (wip.decrementAndGet() != 0); + } } @Override diff --git a/src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java b/src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java index 5108f4001d..235cac495b 100644 --- a/src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java +++ b/src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java @@ -17,16 +17,15 @@ import static org.junit.Assert.*; -import java.util.*; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import rx.*; -import rx.Observable; import rx.Observable.OnSubscribe; -import rx.functions.Action0; +import rx.functions.*; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; @@ -212,4 +211,29 @@ public void call() { public void testAlternateNull() { Observable.just(1).switchIfEmpty(null); } + + Observable recursiveSwitch(final int level) { + if (level == 100) { + return Observable.just(Thread.currentThread().getStackTrace()); + } + return Observable.empty().switchIfEmpty(Observable.defer(new Func0>() { + @Override + public Observable call() { + return recursiveSwitch(level + 1); + } + })); + } + + @Test + public void stackDepth() { + StackTraceElement[] trace = recursiveSwitch(0) + .toBlocking().last(); + + if (trace.length > 1000 || trace.length < 100) { + for (StackTraceElement ste : trace) { + System.out.println(ste); + } + fail("Stack too deep: " + trace.length); + } + } } \ No newline at end of file