Skip to content

Commit 7f6644c

Browse files
committed
Merge pull request #3799 from prt2121/pt/andThenSingle
1.x: Add Completable.andThen(Single)
2 parents 08f5860 + db2e232 commit 7f6644c

File tree

5 files changed

+300
-1
lines changed

5 files changed

+300
-1
lines changed

src/main/java/rx/Completable.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1114,6 +1114,24 @@ public final <T> Observable<T> andThen(Observable<T> next) {
11141114
requireNonNull(next);
11151115
return next.delaySubscription(toObservable());
11161116
}
1117+
1118+
/**
1119+
* Returns a Single which will subscribe to this Completable and once that is completed then
1120+
* will subscribe to the {@code next} Single. An error event from this Completable will be
1121+
* propagated to the downstream subscriber and will result in skipping the subscription of the
1122+
* Single.
1123+
* <dl>
1124+
* <dt><b>Scheduler:</b></dt>
1125+
* <dd>{@code andThen} does not operate by default on a particular {@link Scheduler}.</dd>
1126+
* </dl>
1127+
*
1128+
* @param next the Single to subscribe after this Completable is completed, not null
1129+
* @return Single that composes this Completable and next
1130+
*/
1131+
public final <T> Single<T> andThen(Single<T> next) {
1132+
requireNonNull(next);
1133+
return next.delaySubscription(toObservable());
1134+
}
11171135

11181136
/**
11191137
* Concatenates this Completable with another Completable.

src/main/java/rx/Single.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import rx.internal.util.UtilityFunctions;
2828
import rx.observers.SafeSubscriber;
2929
import rx.observers.SerializedSubscriber;
30-
import rx.plugins.RxJavaObservableExecutionHook;
3130
import rx.plugins.RxJavaPlugins;
3231
import rx.plugins.RxJavaSingleExecutionHook;
3332
import rx.schedulers.Schedulers;
@@ -2671,4 +2670,28 @@ public static <T, Resource> Single<T> using(
26712670
return create(new SingleOnSubscribeUsing<T, Resource>(resourceFactory, singleFactory, disposeAction, disposeEagerly));
26722671
}
26732672

2673+
/**
2674+
* Returns a Single that delays the subscription to this Single
2675+
* until the Observable completes. In case the {@code onError} of the supplied observer throws,
2676+
* the exception will be propagated to the downstream subscriber
2677+
* and will result in skipping the subscription of this Single.
2678+
*
2679+
* <p>
2680+
* <dl>
2681+
* <dt><b>Scheduler:</b></dt>
2682+
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
2683+
* </dl>
2684+
*
2685+
* @param other the Observable that should trigger the subscription
2686+
* to this Single.
2687+
* @return a Single that delays the subscription to this Single
2688+
* until the Observable emits an element or completes normally.
2689+
*/
2690+
@Experimental
2691+
public final Single<T> delaySubscription(Observable<?> other) {
2692+
if (other == null) {
2693+
throw new NullPointerException();
2694+
}
2695+
return create(new SingleOnSubscribeDelaySubscriptionOther<T>(this, other));
2696+
}
26742697
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.Observable;
20+
import rx.Single;
21+
import rx.SingleSubscriber;
22+
import rx.Subscriber;
23+
import rx.plugins.RxJavaPlugins;
24+
import rx.subscriptions.SerialSubscription;
25+
26+
/**
27+
* Delays the subscription to the Single until the Observable
28+
* fires an event or completes.
29+
*
30+
* @param <T> the Single value type
31+
*/
32+
public final class SingleOnSubscribeDelaySubscriptionOther<T> implements Single.OnSubscribe<T> {
33+
final Single<? extends T> main;
34+
final Observable<?> other;
35+
36+
public SingleOnSubscribeDelaySubscriptionOther(Single<? extends T> main, Observable<?> other) {
37+
this.main = main;
38+
this.other = other;
39+
}
40+
41+
@Override
42+
public void call(final SingleSubscriber<? super T> subscriber) {
43+
final SingleSubscriber<T> child = new SingleSubscriber<T>() {
44+
@Override
45+
public void onSuccess(T value) {
46+
subscriber.onSuccess(value);
47+
}
48+
49+
@Override
50+
public void onError(Throwable error) {
51+
subscriber.onError(error);
52+
}
53+
};
54+
55+
final SerialSubscription serial = new SerialSubscription();
56+
subscriber.add(serial);
57+
58+
Subscriber<Object> otherSubscriber = new Subscriber<Object>() {
59+
boolean done;
60+
@Override
61+
public void onNext(Object t) {
62+
onCompleted();
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
if (done) {
68+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
69+
return;
70+
}
71+
done = true;
72+
child.onError(e);
73+
}
74+
75+
@Override
76+
public void onCompleted() {
77+
if (done) {
78+
return;
79+
}
80+
done = true;
81+
serial.set(child);
82+
83+
main.subscribe(child);
84+
}
85+
};
86+
87+
serial.set(otherSubscriber);
88+
89+
other.subscribe(otherSubscriber);
90+
}
91+
}

src/test/java/rx/CompletableTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,61 @@ public void andThenSubscribeOn() {
418418
ts.assertCompleted();
419419
ts.assertNoErrors();
420420
}
421+
422+
@Test
423+
public void andThenSingle() {
424+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
425+
Completable.complete().andThen(Single.just("foo")).subscribe(ts);
426+
ts.requestMore(1);
427+
ts.assertValue("foo");
428+
ts.assertCompleted();
429+
ts.assertNoErrors();
430+
ts.assertUnsubscribed();
431+
}
432+
433+
@Test
434+
public void andThenSingleNever() {
435+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
436+
Completable.never().andThen(Single.just("foo")).subscribe(ts);
437+
ts.requestMore(1);
438+
ts.assertNoValues();
439+
ts.assertNoTerminalEvent();
440+
}
441+
442+
@Test
443+
public void andThenSingleError() {
444+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
445+
final AtomicBoolean hasRun = new AtomicBoolean(false);
446+
final Exception e = new Exception();
447+
Completable.error(e)
448+
.andThen(Single.<String>create(new Single.OnSubscribe<String>() {
449+
@Override
450+
public void call(SingleSubscriber<? super String> s) {
451+
hasRun.set(true);
452+
s.onSuccess("foo");
453+
}
454+
}))
455+
.subscribe(ts);
456+
ts.assertNoValues();
457+
ts.assertError(e);
458+
ts.assertUnsubscribed();
459+
Assert.assertFalse("Should not have subscribed to single when completable errors", hasRun.get());
460+
}
461+
462+
@Test
463+
public void andThenSingleSubscribeOn() {
464+
TestSubscriber<String> ts = new TestSubscriber<String>(0);
465+
TestScheduler scheduler = new TestScheduler();
466+
Completable.complete().andThen(Single.just("foo").delay(1, TimeUnit.SECONDS, scheduler)).subscribe(ts);
467+
ts.requestMore(1);
468+
ts.assertNoValues();
469+
ts.assertNoTerminalEvent();
470+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
471+
ts.assertValue("foo");
472+
ts.assertCompleted();
473+
ts.assertNoErrors();
474+
ts.assertUnsubscribed();
475+
}
421476

422477
@Test(expected = NullPointerException.class)
423478
public void createNull() {
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package rx.internal.operators;
2+
3+
import org.junit.Assert;
4+
import org.junit.Test;
5+
import rx.Single;
6+
import rx.exceptions.TestException;
7+
import rx.functions.Action0;
8+
import rx.observers.TestSubscriber;
9+
import rx.subjects.PublishSubject;
10+
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
public class SingleOnSubscribeDelaySubscriptionOtherTest {
14+
@Test
15+
public void noPrematureSubscription() {
16+
PublishSubject<Object> other = PublishSubject.create();
17+
18+
TestSubscriber<Integer> ts = TestSubscriber.create();
19+
20+
final AtomicInteger subscribed = new AtomicInteger();
21+
22+
Single.just(1)
23+
.doOnSubscribe(new Action0() {
24+
@Override
25+
public void call() {
26+
subscribed.getAndIncrement();
27+
}
28+
})
29+
.delaySubscription(other)
30+
.subscribe(ts);
31+
32+
ts.assertNotCompleted();
33+
ts.assertNoErrors();
34+
ts.assertNoValues();
35+
36+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
37+
38+
other.onNext(1);
39+
40+
Assert.assertEquals("No subscription", 1, subscribed.get());
41+
42+
ts.assertValue(1);
43+
ts.assertNoErrors();
44+
ts.assertCompleted();
45+
}
46+
47+
@Test
48+
public void noPrematureSubscriptionToError() {
49+
PublishSubject<Object> other = PublishSubject.create();
50+
51+
TestSubscriber<Integer> ts = TestSubscriber.create();
52+
53+
final AtomicInteger subscribed = new AtomicInteger();
54+
55+
Single.<Integer>error(new TestException())
56+
.doOnSubscribe(new Action0() {
57+
@Override
58+
public void call() {
59+
subscribed.getAndIncrement();
60+
}
61+
})
62+
.delaySubscription(other)
63+
.subscribe(ts);
64+
65+
ts.assertNotCompleted();
66+
ts.assertNoErrors();
67+
ts.assertNoValues();
68+
69+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
70+
71+
other.onNext(1);
72+
73+
Assert.assertEquals("No subscription", 1, subscribed.get());
74+
75+
ts.assertNoValues();
76+
ts.assertNotCompleted();
77+
ts.assertError(TestException.class);
78+
}
79+
80+
@Test
81+
public void noSubscriptionIfOtherErrors() {
82+
PublishSubject<Object> other = PublishSubject.create();
83+
84+
TestSubscriber<Integer> ts = TestSubscriber.create();
85+
86+
final AtomicInteger subscribed = new AtomicInteger();
87+
88+
Single.<Integer>error(new TestException())
89+
.doOnSubscribe(new Action0() {
90+
@Override
91+
public void call() {
92+
subscribed.getAndIncrement();
93+
}
94+
})
95+
.delaySubscription(other)
96+
.subscribe(ts);
97+
98+
ts.assertNotCompleted();
99+
ts.assertNoErrors();
100+
ts.assertNoValues();
101+
102+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
103+
104+
other.onError(new TestException());
105+
106+
Assert.assertEquals("Premature subscription", 0, subscribed.get());
107+
108+
ts.assertNoValues();
109+
ts.assertNotCompleted();
110+
ts.assertError(TestException.class);
111+
}
112+
}

0 commit comments

Comments
 (0)