From 6c383cf76b57d33c25bf251520a02090b0d11109 Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Sun, 17 Mar 2013 12:46:01 +0000 Subject: [PATCH 1/6] New operation Finally (issue #43) --- .../java/rx/operators/OperationFinally.java | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationFinally.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationFinally.java b/rxjava-core/src/main/java/rx/operators/OperationFinally.java new file mode 100644 index 0000000000..0eddc57d2f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationFinally.java @@ -0,0 +1,145 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.AtomicObserver; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +public final class OperationFinally { + + /** + * Call a given action when a sequence completes (with or without an + * exception). The returned observable is exactly as threadsafe as the + * source observable; in particular, any situation allowing the source to + * call onComplete or onError multiple times allows the returned observable + * to call the action multiple times. + *

+ * Note that "finally" is a Java reserved word and cannot be an identifier, + * so we use "finally0". + * + * @param sequence An observable sequence of elements + * @param action An action to be taken when the sequence is complete or throws an exception + * @return An observable sequence with the same elements as the input. + * After the last element is consumed (just before {@link Observer#onComplete} is called), + * or when an exception is thrown (just before {@link Observer#onError}), the action will be taken. + * @see http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx + */ + public static Func1, Subscription> finally0(final Observable sequence, final Action0 action) { + return new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + return new Finally(sequence, action).call(observer); + } + }; + } + + private static class Finally implements Func1, Subscription> { + private final Observable sequence; + private final Action0 finalAction; + private Subscription s; + + Finally(final Observable sequence, Action0 finalAction) { + this.sequence = sequence; + this.finalAction = finalAction; + } + + private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription(); + + private final Subscription actualSubscription = new Subscription() { + @Override + public void unsubscribe() { + if (null != s) + s.unsubscribe(); + } + }; + + public Subscription call(Observer observer) { + s = sequence.subscribe(new FinallyObserver(observer)); + return Subscription.wrap(actualSubscription); + } + + private class FinallyObserver implements Observer { + private final Observer observer; + + FinallyObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + finalAction.call(); + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + finalAction.call(); + observer.onError(e); + } + + @Override + public void onNext(T args) { + observer.onNext(args); + } + } + } + + public static class UnitTest { + private static class TestAction implements Action0 { + public int called = 0; + @Override public void call() { + called++; + } + } + + @Test + public void testFinally() { + final String[] n = {"1", "2", "3"}; + final Observable nums = Observable.toObservable(n); + TestAction action = new TestAction(); + action.called = 0; + @SuppressWarnings("unchecked") + Observable fin = Observable.create(finally0(nums, action)); + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + fin.subscribe(aObserver); + Assert.assertEquals(1, action.called); + + action.called = 0; + Observable error = Observable.error(new RuntimeException("expected")); + fin = Observable.create(finally0(error, action)); + fin.subscribe(aObserver); + Assert.assertEquals(1, action.called); + } + } +} \ No newline at end of file From e517f55833c7ec02676bd850e9cd8f302a8a713f Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Sun, 17 Mar 2013 14:28:38 +0000 Subject: [PATCH 2/6] Add finally0 to Observable.java . --- rxjava-core/src/main/java/rx/Observable.java | 12 ++++++++++ .../java/rx/operators/OperationFinally.java | 24 ++++++++----------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5d11c7b9ed..72c400dbf7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1182,6 +1182,18 @@ public static Observable concat(Observable... source) { return _create(OperationConcat.concat(source)); } + /** + * Emits the same objects as the given Observable, calling the given action + * when it calls onComplete or onError. + * @param source an observable + * @param action an action to be called when the source completes or errors. + * @return an Observable that emits the same objects, then calls the action. + * @see MSDN: Observable.Finally Method + */ + public static Observable finally0(Observable source, Action0 action) { + return _create(OperationFinally.finally0(source, action)); + } + /** * Groups the elements of an observable and selects the resulting elements by using a specified function. * diff --git a/rxjava-core/src/main/java/rx/operators/OperationFinally.java b/rxjava-core/src/main/java/rx/operators/OperationFinally.java index 0eddc57d2f..4474e11936 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFinally.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFinally.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,15 +23,12 @@ import java.util.List; import java.util.concurrent.CountDownLatch; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.util.AtomicObservableSubscription; -import rx.util.AtomicObserver; import rx.util.functions.Action0; import rx.util.functions.Func1; @@ -42,16 +39,16 @@ public final class OperationFinally { * exception). The returned observable is exactly as threadsafe as the * source observable; in particular, any situation allowing the source to * call onComplete or onError multiple times allows the returned observable - * to call the action multiple times. + * to call the final action multiple times. *

* Note that "finally" is a Java reserved word and cannot be an identifier, * so we use "finally0". - * + * * @param sequence An observable sequence of elements * @param action An action to be taken when the sequence is complete or throws an exception * @return An observable sequence with the same elements as the input. * After the last element is consumed (just before {@link Observer#onComplete} is called), - * or when an exception is thrown (just before {@link Observer#onError}), the action will be taken. + * or when an exception is thrown (just before {@link Observer#onError}), the action will be called. * @see http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx */ public static Func1, Subscription> finally0(final Observable sequence, final Action0 action) { @@ -121,25 +118,24 @@ private static class TestAction implements Action0 { called++; } } - + @Test public void testFinally() { final String[] n = {"1", "2", "3"}; final Observable nums = Observable.toObservable(n); TestAction action = new TestAction(); action.called = 0; - @SuppressWarnings("unchecked") Observable fin = Observable.create(finally0(nums, action)); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); fin.subscribe(aObserver); - Assert.assertEquals(1, action.called); + assertEquals(1, action.called); action.called = 0; Observable error = Observable.error(new RuntimeException("expected")); fin = Observable.create(finally0(error, action)); fin.subscribe(aObserver); - Assert.assertEquals(1, action.called); + assertEquals(1, action.called); } } -} \ No newline at end of file +} From 64a02a639b6383db2a2df192a9ae2717ea38542a Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Sun, 17 Mar 2013 14:32:29 +0000 Subject: [PATCH 3/6] Missing import --- rxjava-core/src/main/java/rx/Observable.java | 1 + 1 file changed, 1 insertion(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 72c400dbf7..5c3940c534 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -40,6 +40,7 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; +import rx.operators.OperationFinally; import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; From e85831573cfac16f88c28944d98b8056b7579ad0 Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Sun, 17 Mar 2013 16:21:30 +0000 Subject: [PATCH 4/6] Added nonstatic Observable.finally0 --- rxjava-core/src/main/java/rx/Observable.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5c3940c534..d839afaf66 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2426,6 +2426,17 @@ public Observable filter(Func1 predicate) { return filter(this, predicate); } + /** + * Registers an action to be called when this observable calls + * onComplete or onError. + * @param action an action to be called when this observable completes or errors. + * @return an Observable that emits the same objects as this observable, then calls the action. + * @see MSDN: Observable.Finally Method + */ + public Observable finally0(Action0 action) { + return _create(OperationFinally.finally0(this, action)); + } + /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

From 232612c800c2583da61b95397141040d3106efda Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Sun, 17 Mar 2013 17:53:24 +0000 Subject: [PATCH 5/6] Incoporate review suggestions. Splits a compound unit test into to parts. Uses mockito instead of a bespoke test object. Removes unused import statements. Changes the order of the Finally action w.r.t. onComplete/onError. --- .../java/rx/operators/OperationFinally.java | 55 ++++++++----------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationFinally.java b/rxjava-core/src/main/java/rx/operators/OperationFinally.java index 4474e11936..d90b0572a6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFinally.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFinally.java @@ -18,11 +18,7 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - +import org.junit.Before; import org.junit.Test; import rx.Observable; @@ -47,9 +43,10 @@ public final class OperationFinally { * @param sequence An observable sequence of elements * @param action An action to be taken when the sequence is complete or throws an exception * @return An observable sequence with the same elements as the input. - * After the last element is consumed (just before {@link Observer#onComplete} is called), - * or when an exception is thrown (just before {@link Observer#onError}), the action will be called. - * @see http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx + * After the last element is consumed (and {@link Observer#onCompleted} has been called), + * or after an exception is thrown (and {@link Observer#onError} has been called), + * the given action will be called. + * @see MSDN Observable.Finally method */ public static Func1, Subscription> finally0(final Observable sequence, final Action0 action) { return new Func1, Subscription>() { @@ -94,14 +91,14 @@ private class FinallyObserver implements Observer { @Override public void onCompleted() { - finalAction.call(); observer.onCompleted(); + finalAction.call(); } @Override public void onError(Exception e) { - finalAction.call(); observer.onError(e); + finalAction.call(); } @Override @@ -112,30 +109,24 @@ public void onNext(T args) { } public static class UnitTest { - private static class TestAction implements Action0 { - public int called = 0; - @Override public void call() { - called++; - } + private Action0 aAction0; + private Observer aObserver; + @Before + public void before() { + aAction0 = mock(Action0.class); + aObserver = mock(Observer.class); + } + private void checkActionCalled(Observable input) { + Observable.create(finally0(input, aAction0)).subscribe(aObserver); + verify(aAction0, times(1)).call(); + } + @Test + public void testFinallyCalledOnComplete() { + checkActionCalled(Observable.toObservable(new String[] {"1", "2", "3"})); } - @Test - public void testFinally() { - final String[] n = {"1", "2", "3"}; - final Observable nums = Observable.toObservable(n); - TestAction action = new TestAction(); - action.called = 0; - Observable fin = Observable.create(finally0(nums, action)); - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - fin.subscribe(aObserver); - assertEquals(1, action.called); - - action.called = 0; - Observable error = Observable.error(new RuntimeException("expected")); - fin = Observable.create(finally0(error, action)); - fin.subscribe(aObserver); - assertEquals(1, action.called); + public void testFinallyCalledOnError() { + checkActionCalled(Observable.error(new RuntimeException("expected"))); } } } From be560b58da1c89e56091cd109f620d7eda3a24fb Mon Sep 17 00:00:00 2001 From: Adam Bliss Date: Fri, 29 Mar 2013 04:22:56 +0000 Subject: [PATCH 6/6] Incorporate review suggestions. - Changes finally0 to finallyDo. - Removes unnecessary subscription-wrapping. - Handle exceptions in onCompleted/onError --- rxjava-core/src/main/java/rx/Observable.java | 8 ++-- .../java/rx/operators/OperationFinally.java | 38 ++++++++----------- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 631e175751..fd14a42ce3 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1192,8 +1192,8 @@ public static Observable concat(Observable... source) { * @return an Observable that emits the same objects, then calls the action. * @see MSDN: Observable.Finally Method */ - public static Observable finally0(Observable source, Action0 action) { - return _create(OperationFinally.finally0(source, action)); + public static Observable finallyDo(Observable source, Action0 action) { + return _create(OperationFinally.finallyDo(source, action)); } /** @@ -2463,8 +2463,8 @@ public Observable filter(Func1 predicate) { * @return an Observable that emits the same objects as this observable, then calls the action. * @see MSDN: Observable.Finally Method */ - public Observable finally0(Action0 action) { - return _create(OperationFinally.finally0(this, action)); + public Observable finallyDo(Action0 action) { + return _create(OperationFinally.finallyDo(this, action)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationFinally.java b/rxjava-core/src/main/java/rx/operators/OperationFinally.java index d90b0572a6..636a8e61ae 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFinally.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFinally.java @@ -33,12 +33,10 @@ public final class OperationFinally { /** * Call a given action when a sequence completes (with or without an * exception). The returned observable is exactly as threadsafe as the - * source observable; in particular, any situation allowing the source to - * call onComplete or onError multiple times allows the returned observable - * to call the final action multiple times. + * source observable. *

* Note that "finally" is a Java reserved word and cannot be an identifier, - * so we use "finally0". + * so we use "finallyDo". * * @param sequence An observable sequence of elements * @param action An action to be taken when the sequence is complete or throws an exception @@ -48,7 +46,7 @@ public final class OperationFinally { * the given action will be called. * @see MSDN Observable.Finally method */ - public static Func1, Subscription> finally0(final Observable sequence, final Action0 action) { + public static Func1, Subscription> finallyDo(final Observable sequence, final Action0 action) { return new Func1, Subscription>() { @Override public Subscription call(Observer observer) { @@ -60,26 +58,14 @@ public Subscription call(Observer observer) { private static class Finally implements Func1, Subscription> { private final Observable sequence; private final Action0 finalAction; - private Subscription s; Finally(final Observable sequence, Action0 finalAction) { this.sequence = sequence; this.finalAction = finalAction; } - private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription(); - - private final Subscription actualSubscription = new Subscription() { - @Override - public void unsubscribe() { - if (null != s) - s.unsubscribe(); - } - }; - public Subscription call(Observer observer) { - s = sequence.subscribe(new FinallyObserver(observer)); - return Subscription.wrap(actualSubscription); + return sequence.subscribe(new FinallyObserver(observer)); } private class FinallyObserver implements Observer { @@ -91,14 +77,20 @@ private class FinallyObserver implements Observer { @Override public void onCompleted() { - observer.onCompleted(); - finalAction.call(); + try { + observer.onCompleted(); + } finally { + finalAction.call(); + } } @Override public void onError(Exception e) { - observer.onError(e); - finalAction.call(); + try { + observer.onError(e); + } finally { + finalAction.call(); + } } @Override @@ -117,7 +109,7 @@ public void before() { aObserver = mock(Observer.class); } private void checkActionCalled(Observable input) { - Observable.create(finally0(input, aAction0)).subscribe(aObserver); + Observable.create(finallyDo(input, aAction0)).subscribe(aObserver); verify(aAction0, times(1)).call(); } @Test