diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index f2e752c2b2..357969565d 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -1114,6 +1114,24 @@ public final Observable andThen(Observable next) { requireNonNull(next); return next.delaySubscription(toObservable()); } + + /** + * Returns a Single which will subscribe to this Completable and once that is completed then + * will subscribe to the {@code next} Single. An error event from this Completable will be + * propagated to the downstream subscriber and will result in skipping the subscription of the + * Single. + *
+ *
Scheduler:
+ *
{@code andThen} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param next the Single to subscribe after this Completable is completed, not null + * @return Single that composes this Completable and next + */ + public final Single andThen(Single next) { + requireNonNull(next); + return next.delaySubscription(toObservable()); + } /** * Concatenates this Completable with another Completable. diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index f5e1156acc..a798073ff9 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -27,7 +27,6 @@ import rx.internal.util.UtilityFunctions; import rx.observers.SafeSubscriber; import rx.observers.SerializedSubscriber; -import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.plugins.RxJavaSingleExecutionHook; import rx.schedulers.Schedulers; @@ -2671,4 +2670,28 @@ public static Single using( return create(new SingleOnSubscribeUsing(resourceFactory, singleFactory, disposeAction, disposeEagerly)); } + /** + * Returns a Single that delays the subscription to this Single + * until the Observable completes. In case the {@code onError} of the supplied observer throws, + * the exception will be propagated to the downstream subscriber + * and will result in skipping the subscription of this Single. + * + *

+ *

+ *
Scheduler:
+ *
This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param other the Observable that should trigger the subscription + * to this Single. + * @return a Single that delays the subscription to this Single + * until the Observable emits an element or completes normally. + */ + @Experimental + public final Single delaySubscription(Observable other) { + if (other == null) { + throw new NullPointerException(); + } + return create(new SingleOnSubscribeDelaySubscriptionOther(this, other)); + } } diff --git a/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java b/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java new file mode 100644 index 0000000000..efd9cf517b --- /dev/null +++ b/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java @@ -0,0 +1,91 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package rx.internal.operators; + +import rx.Observable; +import rx.Single; +import rx.SingleSubscriber; +import rx.Subscriber; +import rx.plugins.RxJavaPlugins; +import rx.subscriptions.SerialSubscription; + +/** + * Delays the subscription to the Single until the Observable + * fires an event or completes. + * + * @param the Single value type + */ +public final class SingleOnSubscribeDelaySubscriptionOther implements Single.OnSubscribe { + final Single main; + final Observable other; + + public SingleOnSubscribeDelaySubscriptionOther(Single main, Observable other) { + this.main = main; + this.other = other; + } + + @Override + public void call(final SingleSubscriber subscriber) { + final SingleSubscriber child = new SingleSubscriber() { + @Override + public void onSuccess(T value) { + subscriber.onSuccess(value); + } + + @Override + public void onError(Throwable error) { + subscriber.onError(error); + } + }; + + final SerialSubscription serial = new SerialSubscription(); + subscriber.add(serial); + + Subscriber otherSubscriber = new Subscriber() { + boolean done; + @Override + public void onNext(Object t) { + onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + return; + } + done = true; + child.onError(e); + } + + @Override + public void onCompleted() { + if (done) { + return; + } + done = true; + serial.set(child); + + main.subscribe(child); + } + }; + + serial.set(otherSubscriber); + + other.subscribe(otherSubscriber); + } +} diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index 894c72109f..2493da2356 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -418,6 +418,61 @@ public void andThenSubscribeOn() { ts.assertCompleted(); ts.assertNoErrors(); } + + @Test + public void andThenSingle() { + TestSubscriber ts = new TestSubscriber(0); + Completable.complete().andThen(Single.just("foo")).subscribe(ts); + ts.requestMore(1); + ts.assertValue("foo"); + ts.assertCompleted(); + ts.assertNoErrors(); + ts.assertUnsubscribed(); + } + + @Test + public void andThenSingleNever() { + TestSubscriber ts = new TestSubscriber(0); + Completable.never().andThen(Single.just("foo")).subscribe(ts); + ts.requestMore(1); + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + } + + @Test + public void andThenSingleError() { + TestSubscriber ts = new TestSubscriber(0); + final AtomicBoolean hasRun = new AtomicBoolean(false); + final Exception e = new Exception(); + Completable.error(e) + .andThen(Single.create(new Single.OnSubscribe() { + @Override + public void call(SingleSubscriber s) { + hasRun.set(true); + s.onSuccess("foo"); + } + })) + .subscribe(ts); + ts.assertNoValues(); + ts.assertError(e); + ts.assertUnsubscribed(); + Assert.assertFalse("Should not have subscribed to single when completable errors", hasRun.get()); + } + + @Test + public void andThenSingleSubscribeOn() { + TestSubscriber ts = new TestSubscriber(0); + TestScheduler scheduler = new TestScheduler(); + Completable.complete().andThen(Single.just("foo").delay(1, TimeUnit.SECONDS, scheduler)).subscribe(ts); + ts.requestMore(1); + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + ts.assertValue("foo"); + ts.assertCompleted(); + ts.assertNoErrors(); + ts.assertUnsubscribed(); + } @Test(expected = NullPointerException.class) public void createNull() { diff --git a/src/test/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOtherTest.java b/src/test/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOtherTest.java new file mode 100644 index 0000000000..c17f3b6e1f --- /dev/null +++ b/src/test/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOtherTest.java @@ -0,0 +1,112 @@ +package rx.internal.operators; + +import org.junit.Assert; +import org.junit.Test; +import rx.Single; +import rx.exceptions.TestException; +import rx.functions.Action0; +import rx.observers.TestSubscriber; +import rx.subjects.PublishSubject; + +import java.util.concurrent.atomic.AtomicInteger; + +public class SingleOnSubscribeDelaySubscriptionOtherTest { + @Test + public void noPrematureSubscription() { + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger subscribed = new AtomicInteger(); + + Single.just(1) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onNext(1); + + Assert.assertEquals("No subscription", 1, subscribed.get()); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void noPrematureSubscriptionToError() { + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger subscribed = new AtomicInteger(); + + Single.error(new TestException()) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onNext(1); + + Assert.assertEquals("No subscription", 1, subscribed.get()); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } + + @Test + public void noSubscriptionIfOtherErrors() { + PublishSubject other = PublishSubject.create(); + + TestSubscriber ts = TestSubscriber.create(); + + final AtomicInteger subscribed = new AtomicInteger(); + + Single.error(new TestException()) + .doOnSubscribe(new Action0() { + @Override + public void call() { + subscribed.getAndIncrement(); + } + }) + .delaySubscription(other) + .subscribe(ts); + + ts.assertNotCompleted(); + ts.assertNoErrors(); + ts.assertNoValues(); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + other.onError(new TestException()); + + Assert.assertEquals("Premature subscription", 0, subscribed.get()); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } +}