Skip to content

Commit 7efa92e

Browse files
author
Aaron Tull
committed
Implemented Observable#toCompletable
1 parent 7ba9067 commit 7efa92e

File tree

3 files changed

+193
-0
lines changed

3 files changed

+193
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,26 @@ public Single<T> toSingle() {
232232
return new Single<T>(OnSubscribeSingle.create(this));
233233
}
234234

235+
/**
236+
* Returns a Completable that emits the single item emitted by the source Observable, if that Observable
237+
* emits only a single item. This operator discards all onNext emissions. When onCompleted is called on the
238+
* source observable then the Completable will call onCompleted. Error terminal events are propagated.
239+
* <p>
240+
* <img width="640" height="295" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.toCompletable.png" alt="">
241+
* <dl>
242+
* <dt><b>Scheduler:</b></dt>
243+
* <dd>{@code toCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
244+
* </dl>
245+
*
246+
* @return a Completable that calls onCompleted on it's subscriber when the source Observable calls onCompleted
247+
* @see <a href="http://reactivex.io/documentation/completable.html">ReactiveX documentation: Completable</a>
248+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
249+
*/
250+
@Experimental
251+
public Completable toCompletable() {
252+
return new Completable(OnSubscribeCompletable.create(this));
253+
}
254+
235255

236256
/* *********************************************************************************************************
237257
* Operators Below Here
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Completable.CompletableOnSubscribe;
19+
import rx.Completable.CompletableSubscriber;
20+
import rx.Observable;
21+
import rx.Single;
22+
import rx.SingleSubscriber;
23+
import rx.Subscriber;
24+
25+
import java.util.NoSuchElementException;
26+
27+
/**
28+
* Allows conversion of an Observable to a Completable that will call onCompleted after an
29+
* onCompleted event is emitted by the source observable. Also forwards errors as appropriate.
30+
*/
31+
public class OnSubscribeCompletable<T> implements CompletableOnSubscribe {
32+
33+
private final Observable<T> observable;
34+
35+
public OnSubscribeCompletable(Observable<T> observable) {
36+
this.observable = observable;
37+
}
38+
39+
@Override
40+
public void call(final CompletableSubscriber child) {
41+
Subscriber<T> parent = new Subscriber<T>() {
42+
private boolean alreadyCompleted = false;
43+
44+
@Override
45+
public void onStart() {
46+
request(Long.MAX_VALUE);
47+
}
48+
49+
@Override
50+
public void onCompleted() {
51+
if (!alreadyCompleted) {
52+
alreadyCompleted = true;
53+
child.onCompleted();
54+
}
55+
}
56+
57+
@Override
58+
public void onError(Throwable e) {
59+
child.onError(e);
60+
unsubscribe();
61+
}
62+
63+
@Override
64+
public void onNext(T t) {
65+
// do nothing at all
66+
}
67+
};
68+
child.onSubscribe(parent);
69+
observable.unsafeSubscribe(parent);
70+
}
71+
72+
public static <T> OnSubscribeCompletable<T> create(Observable<T> observable) {
73+
return new OnSubscribeCompletable<T>(observable);
74+
}
75+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.assertFalse;
19+
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
22+
import org.junit.Test;
23+
24+
import rx.Completable;
25+
import rx.Observable;
26+
import rx.functions.Action0;
27+
import rx.observers.TestSubscriber;
28+
29+
public class OnSubscribeCompletableTest {
30+
31+
@Test
32+
public void testJustSingleItemObservable() {
33+
TestSubscriber<String> subscriber = TestSubscriber.create();
34+
Completable cmp = Observable.just("Hello World!").toCompletable();
35+
cmp.subscribe(subscriber);
36+
37+
subscriber.assertNoValues();
38+
subscriber.assertCompleted();
39+
subscriber.assertNoErrors();
40+
}
41+
42+
@Test
43+
public void testErrorObservable() {
44+
TestSubscriber<String> subscriber = TestSubscriber.create();
45+
IllegalArgumentException error = new IllegalArgumentException("Error");
46+
Completable cmp = Observable.<String>error(error).toCompletable();
47+
cmp.subscribe(subscriber);
48+
49+
subscriber.assertError(error);
50+
subscriber.assertNoValues();
51+
}
52+
53+
@Test
54+
public void testJustTwoEmissionsObservableThrowsError() {
55+
TestSubscriber<String> subscriber = TestSubscriber.create();
56+
Completable cmp = Observable.just("First", "Second").toCompletable();
57+
cmp.subscribe(subscriber);
58+
59+
subscriber.assertNoErrors();
60+
subscriber.assertNoValues();
61+
}
62+
63+
@Test
64+
public void testEmptyObservable() {
65+
TestSubscriber<String> subscriber = TestSubscriber.create();
66+
Completable cmp = Observable.<String>empty().toCompletable();
67+
cmp.subscribe(subscriber);
68+
69+
subscriber.assertNoErrors();
70+
subscriber.assertNoValues();
71+
subscriber.assertCompleted();
72+
}
73+
74+
@Test
75+
public void testNeverObservable() {
76+
TestSubscriber<String> subscriber = TestSubscriber.create();
77+
Completable cmp = Observable.<String>never().toCompletable();
78+
cmp.subscribe(subscriber);
79+
80+
subscriber.assertNoTerminalEvent();
81+
subscriber.assertNoValues();
82+
}
83+
84+
@Test
85+
public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() {
86+
TestSubscriber<String> subscriber = TestSubscriber.create();
87+
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
88+
Completable cmp = Observable.just("Hello World!").doOnUnsubscribe(new Action0() {
89+
90+
@Override
91+
public void call() {
92+
unsubscribed.set(true);
93+
}}).toCompletable();
94+
cmp.subscribe(subscriber);
95+
subscriber.assertCompleted();
96+
assertFalse(unsubscribed.get());
97+
}
98+
}

0 commit comments

Comments
 (0)