diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index ad832cfb89..23a3839cb7 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -25,8 +25,7 @@ import io.reactivex.internal.observers.*; import io.reactivex.internal.operators.completable.*; import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther; -import io.reactivex.internal.operators.maybe.MaybeFromCompletable; -import io.reactivex.internal.operators.maybe.MaybeDelayWithCompletable; +import io.reactivex.internal.operators.maybe.*; import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther; import io.reactivex.internal.operators.single.SingleDelayWithCompletable; import io.reactivex.internal.util.ExceptionHelper; @@ -1171,6 +1170,27 @@ public final Completable doAfterTerminate(final Action onAfterTerminate) { onAfterTerminate, Functions.EMPTY_ACTION); } + /** + * Calls the specified action after this Completable signals onError or onComplete or gets disposed by + * the downstream. + *

In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action + * is executed once per subscription. + *

Note that the {@code onFinally} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Scheduler:
+ *
{@code doFinally} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onFinally the action called when this Completable terminates or gets cancelled + * @return the new Completable instance + * @since 2.0.1 - experimental + */ + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Completable doFinally(Action onFinally) { + ObjectHelper.requireNonNull(onFinally, "onFinally is null"); + return RxJavaPlugins.onAssembly(new CompletableDoFinally(this, onFinally)); + } /** * Advanced use without safeguards: lifts a CompletableOperator diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index c892b467b5..8ed9ef7951 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -7328,7 +7328,7 @@ public final Flowable distinctUntilChanged(BiPredicate * Calls the specified action after this Flowable signals onError or onCompleted or gets cancelled by * the downstream. *

In case of a race between a terminal event and a cancellation, the provided {@code onFinally} action - * is executed at once per subscription. + * is executed once per subscription. *

Note that the {@code onFinally} action is shared between subscriptions and as such * should be thread-safe. *

diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index b478077987..fd92edd8e8 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2302,6 +2302,28 @@ public final Maybe doAfterTerminate(Action onAfterTerminate) { )); } + /** + * Calls the specified action after this Maybe signals onSuccess, onError or onComplete or gets disposed by + * the downstream. + *

In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action + * is executed once per subscription. + *

Note that the {@code onFinally} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Scheduler:
+ *
{@code doFinally} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onFinally the action called when this Maybe terminates or gets cancelled + * @return the new Maybe instance + * @since 2.0.1 - experimental + */ + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Maybe doFinally(Action onFinally) { + ObjectHelper.requireNonNull(onFinally, "onFinally is null"); + return RxJavaPlugins.onAssembly(new MaybeDoFinally(this, onFinally)); + } + /** * Calls the shared runnable if a MaybeObserver subscribed to the current Maybe * disposes the common Disposable it received via onSubscribe. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 5757b0a999..2db7d91319 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -16,7 +16,6 @@ import java.util.*; import java.util.concurrent.*; -import io.reactivex.internal.operators.flowable.FlowableOnBackpressureError; import org.reactivestreams.Publisher; import io.reactivex.annotations.*; @@ -26,7 +25,7 @@ import io.reactivex.internal.functions.*; import io.reactivex.internal.fuseable.ScalarCallable; import io.reactivex.internal.observers.*; -import io.reactivex.internal.operators.flowable.FlowableFromObservable; +import io.reactivex.internal.operators.flowable.*; import io.reactivex.internal.operators.observable.*; import io.reactivex.internal.util.*; import io.reactivex.observables.*; @@ -6440,6 +6439,30 @@ public final Observable doAfterTerminate(Action onFinally) { return doOnEach(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, onFinally); } + /** + * Calls the specified action after this Observable signals onError or onCompleted or gets disposed by + * the downstream. + *

In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action + * is executed once per subscription. + *

Note that the {@code onFinally} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Scheduler:
+ *
{@code doFinally} does not operate by default on a particular {@link Scheduler}.
+ * Operator-fusion: + *
This operator supports boundary-limited synchronous or asynchronous queue-fusion.
+ *
+ * @param onFinally the action called when this Observable terminates or gets cancelled + * @return the new Observable instance + * @since 2.0.1 - experimental + */ + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Observable doFinally(Action onFinally) { + ObjectHelper.requireNonNull(onFinally, "onFinally is null"); + return RxJavaPlugins.onAssembly(new ObservableDoFinally(this, onFinally)); + } + /** * Calls the unsubscribe {@code Action} if the downstream disposes the sequence. *

diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index c9dd7a43de..e9b371a179 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1716,6 +1716,28 @@ public final Single delaySubscription(long time, TimeUnit unit, Scheduler return delaySubscription(Observable.timer(time, unit, scheduler)); } + /** + * Calls the specified action after this Single signals onSuccess or onError or gets disposed by + * the downstream. + *

In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action + * is executed once per subscription. + *

Note that the {@code onFinally} action is shared between subscriptions and as such + * should be thread-safe. + *

+ *
Scheduler:
+ *
{@code doFinally} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onFinally the action called when this Single terminates or gets cancelled + * @return the new Single instance + * @since 2.0.1 - experimental + */ + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Single doFinally(Action onFinally) { + ObjectHelper.requireNonNull(onFinally, "onFinally is null"); + return RxJavaPlugins.onAssembly(new SingleDoFinally(this, onFinally)); + } + /** * Calls the shared consumer with the Disposable sent through the onSubscribe for each * SingleObserver that subscribes to the current Single. diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableDoFinally.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableDoFinally.java new file mode 100644 index 0000000000..5838fff433 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableDoFinally.java @@ -0,0 +1,106 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Action; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Execute an action after an onError, onComplete or a dispose event. + * + * @since 2.0.1 - experimental + */ +@Experimental +public final class CompletableDoFinally extends Completable { + + final CompletableSource source; + + final Action onFinally; + + public CompletableDoFinally(CompletableSource source, Action onFinally) { + this.source = source; + this.onFinally = onFinally; + } + + @Override + protected void subscribeActual(CompletableObserver s) { + source.subscribe(new DoFinallyObserver(s, onFinally)); + } + + static final class DoFinallyObserver extends AtomicInteger implements CompletableObserver, Disposable { + + private static final long serialVersionUID = 4109457741734051389L; + + final CompletableObserver actual; + + final Action onFinally; + + Disposable d; + + DoFinallyObserver(CompletableObserver actual, Action onFinally) { + this.actual = actual; + this.onFinally = onFinally; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + runFinally(); + } + + @Override + public void onComplete() { + actual.onComplete(); + runFinally(); + } + + @Override + public void dispose() { + d.dispose(); + runFinally(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + void runFinally() { + if (compareAndSet(0, 1)) { + try { + onFinally.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoFinally.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoFinally.java new file mode 100644 index 0000000000..3a5d6176b0 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDoFinally.java @@ -0,0 +1,111 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Action; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Execute an action after an onSuccess, onError, onComplete or a dispose event. + * + * @param the value type + * @since 2.0.1 - experimental + */ +@Experimental +public final class MaybeDoFinally extends AbstractMaybeWithUpstream { + + final Action onFinally; + + public MaybeDoFinally(MaybeSource source, Action onFinally) { + super(source); + this.onFinally = onFinally; + } + + @Override + protected void subscribeActual(MaybeObserver s) { + source.subscribe(new DoFinallyObserver(s, onFinally)); + } + + static final class DoFinallyObserver extends AtomicInteger implements MaybeObserver, Disposable { + + private static final long serialVersionUID = 4109457741734051389L; + + final MaybeObserver actual; + + final Action onFinally; + + Disposable d; + + DoFinallyObserver(MaybeObserver actual, Action onFinally) { + this.actual = actual; + this.onFinally = onFinally; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T t) { + actual.onSuccess(t); + runFinally(); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + runFinally(); + } + + @Override + public void onComplete() { + actual.onComplete(); + runFinally(); + } + + @Override + public void dispose() { + d.dispose(); + runFinally(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + void runFinally() { + if (compareAndSet(0, 1)) { + try { + onFinally.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDoFinally.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDoFinally.java new file mode 100644 index 0000000000..2887d871d2 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDoFinally.java @@ -0,0 +1,150 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Action; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.observers.BasicIntQueueDisposable; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Execute an action after an onError, onComplete or a dispose event. + * + * @param the value type + * @since 2.0.1 - experimental + */ +@Experimental +public final class ObservableDoFinally extends AbstractObservableWithUpstream { + + final Action onFinally; + + public ObservableDoFinally(ObservableSource source, Action onFinally) { + super(source); + this.onFinally = onFinally; + } + + @Override + protected void subscribeActual(Observer s) { + source.subscribe(new DoFinallyObserver(s, onFinally)); + } + + static final class DoFinallyObserver extends BasicIntQueueDisposable implements Observer { + + private static final long serialVersionUID = 4109457741734051389L; + + final Observer actual; + + final Action onFinally; + + Disposable d; + + QueueDisposable qd; + + boolean syncFused; + + DoFinallyObserver(Observer actual, Action onFinally) { + this.actual = actual; + this.onFinally = onFinally; + } + + @SuppressWarnings("unchecked") + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + if (d instanceof QueueDisposable) { + this.qd = (QueueDisposable)d; + } + + actual.onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + runFinally(); + } + + @Override + public void onComplete() { + actual.onComplete(); + runFinally(); + } + + @Override + public void dispose() { + d.dispose(); + runFinally(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + @Override + public int requestFusion(int mode) { + QueueDisposable qd = this.qd; + if (qd != null && (mode & BOUNDARY) == 0) { + int m = qd.requestFusion(mode); + if (m != NONE) { + syncFused = m == SYNC; + } + return m; + } + return NONE; + } + + @Override + public void clear() { + qd.clear(); + } + + @Override + public boolean isEmpty() { + return qd.isEmpty(); + } + + @Override + public T poll() throws Exception { + T v = qd.poll(); + if (v == null && syncFused) { + runFinally(); + } + return v; + } + + void runFinally() { + if (compareAndSet(0, 1)) { + try { + onFinally.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDoFinally.java b/src/main/java/io/reactivex/internal/operators/single/SingleDoFinally.java new file mode 100644 index 0000000000..d24c546de2 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDoFinally.java @@ -0,0 +1,107 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Action; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Execute an action after an onSuccess, onError or a dispose event. + * + * @param the value type + * @since 2.0.1 - experimental + */ +@Experimental +public final class SingleDoFinally extends Single { + + final SingleSource source; + + final Action onFinally; + + public SingleDoFinally(SingleSource source, Action onFinally) { + this.source = source; + this.onFinally = onFinally; + } + + @Override + protected void subscribeActual(SingleObserver s) { + source.subscribe(new DoFinallyObserver(s, onFinally)); + } + + static final class DoFinallyObserver extends AtomicInteger implements SingleObserver, Disposable { + + private static final long serialVersionUID = 4109457741734051389L; + + final SingleObserver actual; + + final Action onFinally; + + Disposable d; + + DoFinallyObserver(SingleObserver actual, Action onFinally) { + this.actual = actual; + this.onFinally = onFinally; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T t) { + actual.onSuccess(t); + runFinally(); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + runFinally(); + } + + @Override + public void dispose() { + d.dispose(); + runFinally(); + } + + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + + void runFinally() { + if (compareAndSet(0, 1)) { + try { + onFinally.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + } + } +} diff --git a/src/test/java/io/reactivex/MaybeNo2Dot0Since.java b/src/test/java/io/reactivex/MaybeNo2Dot0Since.java index bef986dfc3..53ac073fcf 100644 --- a/src/test/java/io/reactivex/MaybeNo2Dot0Since.java +++ b/src/test/java/io/reactivex/MaybeNo2Dot0Since.java @@ -84,7 +84,7 @@ public void noSince20InMaybe() throws Exception { } if (classDefPassed) { - if (line.contains("@since") && line.contains("2.0")) { + if (line.contains("@since") && line.contains("2.0") && !line.contains("2.0.")) { b.append("java.lang.RuntimeException: @since 2.0 found").append("\r\n") .append(" at io.reactivex.Maybe (Maybe.java:").append(ln).append(")\r\n\r\n"); ; diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableDoFinallyTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableDoFinallyTest.java new file mode 100644 index 0000000000..13cf9b4d8f --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableDoFinallyTest.java @@ -0,0 +1,97 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; + +public class CompletableDoFinallyTest implements Action { + + int calls; + + @Override + public void run() throws Exception { + calls++; + } + + @Test + public void normalEmpty() { + Completable.complete() + .doFinally(this) + .test() + .assertResult(); + + assertEquals(1, calls); + } + + @Test + public void normalError() { + Completable.error(new TestException()) + .doFinally(this) + .test() + .assertFailure(TestException.class); + + assertEquals(1, calls); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletable(new Function() { + @Override + public Completable apply(Completable f) throws Exception { + return f.doFinally(CompletableDoFinallyTest.this); + } + }); + } + + @Test(expected = NullPointerException.class) + public void nullAction() { + Completable.complete().doFinally(null); + } + + @Test + public void actionThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Completable.complete() + .doFinally(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult() + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposed() { + TestHelper.checkDisposed(PublishSubject.create().ignoreElements().doFinally(this)); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoFinallyTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoFinallyTest.java new file mode 100644 index 0000000000..cf0118a651 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoFinallyTest.java @@ -0,0 +1,169 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; + +public class MaybeDoFinallyTest implements Action { + + int calls; + + @Override + public void run() throws Exception { + calls++; + } + + @Test + public void normalJust() { + Maybe.just(1) + .doFinally(this) + .test() + .assertResult(1); + + assertEquals(1, calls); + } + + @Test + public void normalEmpty() { + Maybe.empty() + .doFinally(this) + .test() + .assertResult(); + + assertEquals(1, calls); + } + + @Test + public void normalError() { + Maybe.error(new TestException()) + .doFinally(this) + .test() + .assertFailure(TestException.class); + + assertEquals(1, calls); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, Maybe>() { + @Override + public Maybe apply(Maybe f) throws Exception { + return f.doFinally(MaybeDoFinallyTest.this); + } + }); + TestHelper.checkDoubleOnSubscribeMaybe(new Function, Maybe>() { + @Override + public Maybe apply(Maybe f) throws Exception { + return f.doFinally(MaybeDoFinallyTest.this).filter(Functions.alwaysTrue()); + } + }); + } + + @Test + public void normalJustConditional() { + Maybe.just(1) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(1); + + assertEquals(1, calls); + } + + @Test + public void normalEmptyConditional() { + Maybe.empty() + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(); + + assertEquals(1, calls); + } + + @Test + public void normalErrorConditional() { + Maybe.error(new TestException()) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(TestException.class); + + assertEquals(1, calls); + } + + @Test(expected = NullPointerException.class) + public void nullAction() { + Maybe.just(1).doFinally(null); + } + + @Test + public void actionThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Maybe.just(1) + .doFinally(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(1) + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void actionThrowsConditional() { + List errors = TestHelper.trackPluginErrors(); + try { + Maybe.just(1) + .doFinally(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(1) + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposed() { + TestHelper.checkDisposed(PublishSubject.create().singleElement().doFinally(this)); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoFinallyTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoFinallyTest.java new file mode 100644 index 0000000000..7e2aa53213 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoFinallyTest.java @@ -0,0 +1,445 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.observable; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueDisposable; +import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.UnicastSubject; + +public class ObservableDoFinallyTest implements Action { + + int calls; + + @Override + public void run() throws Exception { + calls++; + } + + @Test + public void normalJust() { + Observable.just(1) + .doFinally(this) + .test() + .assertResult(1); + + assertEquals(1, calls); + } + + @Test + public void normalEmpty() { + Observable.empty() + .doFinally(this) + .test() + .assertResult(); + + assertEquals(1, calls); + } + + @Test + public void normalError() { + Observable.error(new TestException()) + .doFinally(this) + .test() + .assertFailure(TestException.class); + + assertEquals(1, calls); + } + + @Test + public void normalTake() { + Observable.range(1, 10) + .doFinally(this) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>() { + @Override + public Observable apply(Observable f) throws Exception { + return f.doFinally(ObservableDoFinallyTest.this); + } + }); + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>() { + @Override + public Observable apply(Observable f) throws Exception { + return f.doFinally(ObservableDoFinallyTest.this).filter(Functions.alwaysTrue()); + } + }); + } + + @Test + public void syncFused() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.SYNC); + + Observable.range(1, 5) + .doFinally(this) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.SYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void syncFusedBoundary() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.SYNC | QueueDisposable.BOUNDARY); + + Observable.range(1, 5) + .doFinally(this) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void asyncFused() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.ASYNC); + + UnicastSubject up = UnicastSubject.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doFinally(this) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.ASYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void asyncFusedBoundary() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.ASYNC | QueueDisposable.BOUNDARY); + + UnicastSubject up = UnicastSubject.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doFinally(this) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + + @Test + public void normalJustConditional() { + Observable.just(1) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(1); + + assertEquals(1, calls); + } + + @Test + public void normalEmptyConditional() { + Observable.empty() + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(); + + assertEquals(1, calls); + } + + @Test + public void normalErrorConditional() { + Observable.error(new TestException()) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .test() + .assertFailure(TestException.class); + + assertEquals(1, calls); + } + + @Test + public void normalTakeConditional() { + Observable.range(1, 10) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void syncFusedConditional() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.SYNC); + + Observable.range(1, 5) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.SYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void nonFused() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.SYNC); + + Observable.range(1, 5).hide() + .doFinally(this) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void nonFusedConditional() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.SYNC); + + Observable.range(1, 5).hide() + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void syncFusedBoundaryConditional() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.SYNC | QueueDisposable.BOUNDARY); + + Observable.range(1, 5) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void asyncFusedConditional() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.ASYNC); + + UnicastSubject up = UnicastSubject.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.ASYNC) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test + public void asyncFusedBoundaryConditional() { + TestObserver ts = ObserverFusion.newTest(QueueDisposable.ASYNC | QueueDisposable.BOUNDARY); + + UnicastSubject up = UnicastSubject.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ObserverFusion.assertFusion(ts, QueueDisposable.NONE) + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls); + } + + @Test(expected = NullPointerException.class) + public void nullAction() { + Observable.just(1).doFinally(null); + } + + @Test + public void actionThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.just(1) + .doFinally(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(1) + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void actionThrowsConditional() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.just(1) + .doFinally(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(1) + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void clearIsEmpty() { + Observable.range(1, 5) + .doFinally(this) + .subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable s) { + @SuppressWarnings("unchecked") + QueueDisposable qs = (QueueDisposable)s; + + qs.requestFusion(QueueDisposable.ANY); + + assertFalse(qs.isEmpty()); + + try { + assertEquals(1, qs.poll().intValue()); + } catch (Throwable ex) { + throw new RuntimeException(ex); + } + + assertFalse(qs.isEmpty()); + + qs.clear(); + + assertTrue(qs.isEmpty()); + + qs.dispose(); + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + assertEquals(1, calls); + } + + @Test + public void clearIsEmptyConditional() { + Observable.range(1, 5) + .doFinally(this) + .filter(Functions.alwaysTrue()) + .subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable s) { + @SuppressWarnings("unchecked") + QueueDisposable qs = (QueueDisposable)s; + + qs.requestFusion(QueueDisposable.ANY); + + assertFalse(qs.isEmpty()); + + assertFalse(qs.isDisposed()); + + try { + assertEquals(1, qs.poll().intValue()); + } catch (Throwable ex) { + throw new RuntimeException(ex); + } + + assertFalse(qs.isEmpty()); + + qs.clear(); + + assertTrue(qs.isEmpty()); + + qs.dispose(); + + assertTrue(qs.isDisposed()); + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + assertEquals(1, calls); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDoFinallyTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDoFinallyTest.java new file mode 100644 index 0000000000..15dd349b0c --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDoFinallyTest.java @@ -0,0 +1,97 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; + +public class SingleDoFinallyTest implements Action { + + int calls; + + @Override + public void run() throws Exception { + calls++; + } + + @Test + public void normalJust() { + Single.just(1) + .doFinally(this) + .test() + .assertResult(1); + + assertEquals(1, calls); + } + + @Test + public void normalError() { + Single.error(new TestException()) + .doFinally(this) + .test() + .assertFailure(TestException.class); + + assertEquals(1, calls); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, Single>() { + @Override + public Single apply(Single f) throws Exception { + return f.doFinally(SingleDoFinallyTest.this); + } + }); + } + + @Test(expected = NullPointerException.class) + public void nullAction() { + Single.just(1).doFinally(null); + } + + @Test + public void actionThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Single.just(1) + .doFinally(new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }) + .test() + .assertResult(1) + .cancel(); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void disposed() { + TestHelper.checkDisposed(PublishSubject.create().singleOrError().doFinally(this)); + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java b/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java index 129ee61502..f6f90a86dd 100644 --- a/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java +++ b/src/test/java/io/reactivex/internal/schedulers/SchedulerWhenTest.java @@ -1,3 +1,16 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex.internal.schedulers; import static io.reactivex.Flowable.just;