From acaf42e94eead32c1137b1d72b1ec7c84fb0fb6c Mon Sep 17 00:00:00 2001 From: Aaron Tull Date: Wed, 9 Dec 2015 12:41:53 -0800 Subject: [PATCH] Implemented Completable#andThen(Observable) --- src/main/java/rx/Completable.java | 15 +++++++ src/test/java/rx/CompletableTest.java | 59 +++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index aa222b9e8e..80a1ea5b25 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -1080,6 +1080,21 @@ public final Completable compose(CompletableTransformer transformer) { return to(transformer); } + /** + * Returns an Observable which will subscribe to this Completable and once that is completed then + * will subscribe to the {@code next} Observable. An error event from this Completable will be + * propagated to the downstream subscriber and will result in skipping the subscription of the + * Observable. + * + * @param next the Observable to subscribe after this Completable is completed, not null + * @return Observable that composes this Completable and next + * @throws NullPointerException if next is null + */ + public final Observable andThen(Observable next) { + requireNonNull(next); + return next.delaySubscription(toObservable()); + } + /** * Concatenates this Completable with another Completable. * @param other the other Completable, not null diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index 09d71a9ff7..1fca0df0e0 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -23,6 +23,7 @@ import org.junit.*; import rx.Completable.*; +import rx.Observable.OnSubscribe; import rx.exceptions.*; import rx.functions.*; import rx.observers.TestSubscriber; @@ -357,6 +358,64 @@ public void call(Long v) { Assert.assertEquals(Arrays.asList(5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), requested); } + @Test + public void andThen() { + TestSubscriber ts = new TestSubscriber(0); + Completable.complete().andThen(Observable.just("foo")).subscribe(ts); + ts.requestMore(1); + ts.assertValue("foo"); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test + public void andThenNever() { + TestSubscriber ts = new TestSubscriber(0); + Completable.never().andThen(Observable.just("foo")).subscribe(ts); + ts.requestMore(1); + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + } + + @Test + public void andThenError() { + TestSubscriber ts = new TestSubscriber(0); + final AtomicBoolean hasRun = new AtomicBoolean(false); + final Exception e = new Exception(); + Completable.create(new CompletableOnSubscribe() { + @Override + public void call(CompletableSubscriber cs) { + cs.onError(e); + } + }) + .andThen(Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber s) { + hasRun.set(true); + s.onNext("foo"); + s.onCompleted(); + } + })) + .subscribe(ts); + ts.assertNoValues(); + ts.assertError(e); + Assert.assertFalse("Should not have subscribed to observable when completable errors", hasRun.get()); + } + + @Test + public void andThenSubscribeOn() { + TestSubscriber ts = new TestSubscriber(0); + TestScheduler scheduler = new TestScheduler(); + Completable.complete().andThen(Observable.just("foo").delay(1, TimeUnit.SECONDS, scheduler)).subscribe(ts); + ts.requestMore(1); + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + ts.assertValue("foo"); + ts.assertCompleted(); + ts.assertNoErrors(); + } + @Test(expected = NullPointerException.class) public void createNull() { Completable.create(null);