Skip to content

Commit bcdfb13

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add Completable.hide() (#4973)
1 parent a3ccbf9 commit bcdfb13

File tree

4 files changed

+204
-0
lines changed

4 files changed

+204
-0
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,24 @@ public final <T> Flowable<T> startWith(Publisher<T> other) {
15861586
return this.<T>toFlowable().startWith(other);
15871587
}
15881588

1589+
/**
1590+
* Hides the identity of this Completable and its Disposable.
1591+
* <p>Allows preventing certain identity-based
1592+
* optimizations (fusion).
1593+
* <dl>
1594+
* <dt><b>Scheduler:</b></dt>
1595+
* <dd>{@code hide} does not operate by default on a particular {@link Scheduler}.</dd>
1596+
* </dl>
1597+
* @return the new Completable instance
1598+
* @since 2.0.5 - experimental
1599+
*/
1600+
@Experimental
1601+
@CheckReturnValue
1602+
@SchedulerSupport(SchedulerSupport.NONE)
1603+
public final Completable hide() {
1604+
return RxJavaPlugins.onAssembly(new CompletableHide(this));
1605+
}
1606+
15891607
/**
15901608
* Subscribes to this CompletableConsumable and returns a Disposable which can be used to cancel
15911609
* the subscription.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.CompletableObserver;
18+
import io.reactivex.CompletableSource;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
22+
/**
23+
* Hides the identity of the upstream Completable and its Disposable sent through onSubscribe.
24+
*/
25+
public final class CompletableHide extends Completable {
26+
27+
final CompletableSource source;
28+
29+
public CompletableHide(CompletableSource source) {
30+
this.source = source;
31+
}
32+
33+
@Override
34+
protected void subscribeActual(CompletableObserver observer) {
35+
source.subscribe(new HideCompletableObserver(observer));
36+
}
37+
38+
static final class HideCompletableObserver implements CompletableObserver, Disposable {
39+
40+
final CompletableObserver actual;
41+
42+
Disposable d;
43+
44+
HideCompletableObserver(CompletableObserver actual) {
45+
this.actual = actual;
46+
}
47+
48+
@Override
49+
public void dispose() {
50+
d.dispose();
51+
d = DisposableHelper.DISPOSED;
52+
}
53+
54+
@Override
55+
public boolean isDisposed() {
56+
return d.isDisposed();
57+
}
58+
59+
@Override
60+
public void onSubscribe(Disposable d) {
61+
if (DisposableHelper.validate(this.d, d)) {
62+
this.d = d;
63+
64+
actual.onSubscribe(this);
65+
}
66+
}
67+
68+
@Override
69+
public void onError(Throwable e) {
70+
actual.onError(e);
71+
}
72+
73+
@Override
74+
public void onComplete() {
75+
actual.onComplete();
76+
}
77+
}
78+
}

src/test/java/io/reactivex/TestHelper.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.reactivex.functions.*;
3333
import io.reactivex.internal.functions.ObjectHelper;
3434
import io.reactivex.internal.fuseable.*;
35+
import io.reactivex.internal.operators.completable.CompletableToFlowable;
3536
import io.reactivex.internal.operators.maybe.MaybeToFlowable;
3637
import io.reactivex.internal.operators.single.SingleToFlowable;
3738
import io.reactivex.internal.subscriptions.BooleanSubscription;
@@ -1962,6 +1963,28 @@ public static <T, U> void checkDisposedMaybe(Function<Maybe<T>, ? extends MaybeS
19621963
assertFalse("Dispose not propagated!", pp.hasSubscribers());
19631964
}
19641965

1966+
/**
1967+
* Check if the operator applied to a Completable source propagates dispose properly.
1968+
* @param composer the function to apply an operator to the provided Completable source
1969+
*/
1970+
public static void checkDisposedCompletable(Function<Completable, ? extends CompletableSource> composer) {
1971+
PublishProcessor<Integer> pp = PublishProcessor.create();
1972+
1973+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1974+
1975+
try {
1976+
new CompletableToFlowable<Integer>(composer.apply(pp.ignoreElements())).subscribe(ts);
1977+
} catch (Throwable ex) {
1978+
throw ExceptionHelper.wrapOrThrow(ex);
1979+
}
1980+
1981+
assertTrue("Not subscribed to source!", pp.hasSubscribers());
1982+
1983+
ts.cancel();
1984+
1985+
assertFalse("Dispose not propagated!", pp.hasSubscribers());
1986+
}
1987+
19651988
/**
19661989
* Check if the operator applied to a Maybe source propagates dispose properly.
19671990
* @param <T> the source value type
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.CompletableSource;
18+
import io.reactivex.TestHelper;
19+
import io.reactivex.exceptions.TestException;
20+
import io.reactivex.functions.Function;
21+
import io.reactivex.processors.PublishProcessor;
22+
import io.reactivex.subjects.CompletableSubject;
23+
import org.junit.Test;
24+
25+
import static org.junit.Assert.assertFalse;
26+
27+
public class CompletableHideTest {
28+
29+
@Test
30+
public void never() {
31+
Completable.never()
32+
.hide()
33+
.test()
34+
.assertNotComplete()
35+
.assertNoErrors();
36+
}
37+
38+
@Test
39+
public void complete() {
40+
Completable.complete()
41+
.hide()
42+
.test()
43+
.assertResult();
44+
}
45+
46+
@Test
47+
public void error() {
48+
Completable.error(new TestException())
49+
.hide()
50+
.test()
51+
.assertFailure(TestException.class);
52+
}
53+
54+
@Test
55+
public void hidden() {
56+
assertFalse(CompletableSubject.create().hide() instanceof CompletableSubject);
57+
}
58+
59+
@Test
60+
public void dispose() {
61+
TestHelper.checkDisposedCompletable(new Function<Completable, CompletableSource>() {
62+
@Override
63+
public CompletableSource apply(Completable m) throws Exception {
64+
return m.hide();
65+
}
66+
});
67+
}
68+
69+
@Test
70+
public void isDisposed() {
71+
PublishProcessor<Integer> pp = PublishProcessor.create();
72+
73+
TestHelper.checkDisposed(pp.ignoreElements().hide());
74+
}
75+
76+
@Test
77+
public void doubleOnSubscribe() {
78+
TestHelper.checkDoubleOnSubscribeCompletable(new Function<Completable, Completable>() {
79+
@Override
80+
public Completable apply(Completable f) throws Exception {
81+
return f.hide();
82+
}
83+
});
84+
}
85+
}

0 commit comments

Comments
 (0)