diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index c07ba8ae6a..bc9cd271dc 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -2031,6 +2031,32 @@ public final Completable subscribeOn(final Scheduler scheduler) { return RxJavaPlugins.onAssembly(new CompletableSubscribeOn(this, scheduler)); } + /** + * Terminates the downstream if this or the other {@code Completable} + * terminates (wins the termination race) while disposing the connection to the losing source. + *

+ * + *

+ *
Scheduler:
+ *
{@code takeUntil} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If both this and the other sources signal an error, only one of the errors + * is signaled to the downstream and the other error is signaled to the global + * error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
+ * @param other the other completable source to observe for the terminal signals + * @return the new Completable instance + * @since 2.1.17 - experimental + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.NONE) + public final Completable takeUntil(CompletableSource other) { + ObjectHelper.requireNonNull(other, "other is null"); + + return RxJavaPlugins.onAssembly(new CompletableTakeUntilCompletable(this, other)); + } + /** * Returns a Completable that runs this Completable and emits a TimeoutException in case * this Completable doesn't complete within the given time. diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableTakeUntilCompletable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableTakeUntilCompletable.java new file mode 100644 index 0000000000..1343d35b98 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableTakeUntilCompletable.java @@ -0,0 +1,145 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.concurrent.atomic.*; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Terminates the sequence if either the main or the other Completable terminate. + * @since 2.1.17 - experimental + */ +@Experimental +public final class CompletableTakeUntilCompletable extends Completable { + + final Completable source; + + final CompletableSource other; + + public CompletableTakeUntilCompletable(Completable source, + CompletableSource other) { + this.source = source; + this.other = other; + } + + @Override + protected void subscribeActual(CompletableObserver s) { + TakeUntilMainObserver parent = new TakeUntilMainObserver(s); + s.onSubscribe(parent); + + other.subscribe(parent.other); + source.subscribe(parent); + } + + static final class TakeUntilMainObserver extends AtomicReference + implements CompletableObserver, Disposable { + + private static final long serialVersionUID = 3533011714830024923L; + + final CompletableObserver downstream; + + final OtherObserver other; + + final AtomicBoolean once; + + TakeUntilMainObserver(CompletableObserver downstream) { + this.downstream = downstream; + this.other = new OtherObserver(this); + this.once = new AtomicBoolean(); + } + + @Override + public void dispose() { + if (once.compareAndSet(false, true)) { + DisposableHelper.dispose(this); + DisposableHelper.dispose(other); + } + } + + @Override + public boolean isDisposed() { + return once.get(); + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onComplete() { + if (once.compareAndSet(false, true)) { + DisposableHelper.dispose(other); + downstream.onComplete(); + } + } + + @Override + public void onError(Throwable e) { + if (once.compareAndSet(false, true)) { + DisposableHelper.dispose(other); + downstream.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + void innerComplete() { + if (once.compareAndSet(false, true)) { + DisposableHelper.dispose(this); + downstream.onComplete(); + } + } + + void innerError(Throwable e) { + if (once.compareAndSet(false, true)) { + DisposableHelper.dispose(this); + downstream.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + static final class OtherObserver extends AtomicReference + implements CompletableObserver { + + private static final long serialVersionUID = 5176264485428790318L; + final TakeUntilMainObserver parent; + + OtherObserver(TakeUntilMainObserver parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onComplete() { + parent.innerComplete(); + } + + @Override + public void onError(Throwable e) { + parent.innerError(e); + } + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableTakeUntilTest.java new file mode 100644 index 0000000000..586e0c3207 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableTakeUntilTest.java @@ -0,0 +1,235 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposables; +import io.reactivex.exceptions.TestException; +import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.CompletableSubject; + +public class CompletableTakeUntilTest { + + @Test + public void consumerDisposes() { + CompletableSubject cs1 = CompletableSubject.create(); + CompletableSubject cs2 = CompletableSubject.create(); + + TestObserver to = cs1.takeUntil(cs2).test(); + + to.assertEmpty(); + + assertTrue(cs1.hasObservers()); + assertTrue(cs2.hasObservers()); + + to.dispose(); + + assertFalse(cs1.hasObservers()); + assertFalse(cs2.hasObservers()); + } + + @Test + public void mainCompletes() { + CompletableSubject cs1 = CompletableSubject.create(); + CompletableSubject cs2 = CompletableSubject.create(); + + TestObserver to = cs1.takeUntil(cs2).test(); + + to.assertEmpty(); + + assertTrue(cs1.hasObservers()); + assertTrue(cs2.hasObservers()); + + cs1.onComplete(); + + assertFalse(cs1.hasObservers()); + assertFalse(cs2.hasObservers()); + + to.assertResult(); + } + + @Test + public void otherCompletes() { + CompletableSubject cs1 = CompletableSubject.create(); + CompletableSubject cs2 = CompletableSubject.create(); + + TestObserver to = cs1.takeUntil(cs2).test(); + + to.assertEmpty(); + + assertTrue(cs1.hasObservers()); + assertTrue(cs2.hasObservers()); + + cs2.onComplete(); + + assertFalse(cs1.hasObservers()); + assertFalse(cs2.hasObservers()); + + to.assertResult(); + } + + @Test + public void mainErrors() { + CompletableSubject cs1 = CompletableSubject.create(); + CompletableSubject cs2 = CompletableSubject.create(); + + TestObserver to = cs1.takeUntil(cs2).test(); + + to.assertEmpty(); + + assertTrue(cs1.hasObservers()); + assertTrue(cs2.hasObservers()); + + cs1.onError(new TestException()); + + assertFalse(cs1.hasObservers()); + assertFalse(cs2.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void otherErrors() { + CompletableSubject cs1 = CompletableSubject.create(); + CompletableSubject cs2 = CompletableSubject.create(); + + TestObserver to = cs1.takeUntil(cs2).test(); + + to.assertEmpty(); + + assertTrue(cs1.hasObservers()); + assertTrue(cs2.hasObservers()); + + cs2.onError(new TestException()); + + assertFalse(cs1.hasObservers()); + assertFalse(cs2.hasObservers()); + + to.assertFailure(TestException.class); + } + + @Test + public void isDisposed() { + CompletableSubject cs1 = CompletableSubject.create(); + CompletableSubject cs2 = CompletableSubject.create(); + + TestHelper.checkDisposed(cs1.takeUntil(cs2)); + } + + @Test + public void mainErrorLate() { + + List errors = TestHelper.trackPluginErrors(); + try { + + new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + s.onError(new TestException()); + } + }.takeUntil(Completable.complete()) + .test() + .assertResult(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void mainCompleteLate() { + + List errors = TestHelper.trackPluginErrors(); + try { + + new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + s.onComplete(); + } + }.takeUntil(Completable.complete()) + .test() + .assertResult(); + + assertTrue(errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void otherErrorLate() { + + List errors = TestHelper.trackPluginErrors(); + try { + + final AtomicReference ref = new AtomicReference(); + + Completable.complete() + .takeUntil(new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + ref.set(s); + } + }) + .test() + .assertResult(); + + ref.get().onError(new TestException()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void otherCompleteLate() { + + List errors = TestHelper.trackPluginErrors(); + try { + + final AtomicReference ref = new AtomicReference(); + + Completable.complete() + .takeUntil(new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(Disposables.empty()); + ref.set(s); + } + }) + .test() + .assertResult(); + + ref.get().onComplete(); + + assertTrue(errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } +}