Skip to content

Commit e036606

Browse files
committed
Merge pull request #222 from joshgord/pull-issue-43-merge
Pull issue 43 merge
2 parents 1220cff + c029724 commit e036606

File tree

2 files changed

+148
-0
lines changed

2 files changed

+148
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import rx.operators.OperationDefer;
4343
import rx.operators.OperationDematerialize;
4444
import rx.operators.OperationFilter;
45+
import rx.operators.OperationFinally;
4546
import rx.operators.OperationMap;
4647
import rx.operators.OperationMaterialize;
4748
import rx.operators.OperationMerge;
@@ -1221,6 +1222,18 @@ public static <T> Observable<T> concat(Observable<T>... source) {
12211222
return create(OperationConcat.concat(source));
12221223
}
12231224

1225+
/**
1226+
* Emits the same objects as the given Observable, calling the given action
1227+
* when it calls <code>onComplete</code> or <code>onError</code>.
1228+
* @param source an observable
1229+
* @param action an action to be called when the source completes or errors.
1230+
* @return an Observable that emits the same objects, then calls the action.
1231+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
1232+
*/
1233+
public static <T> Observable<T> finallyDo(Observable source, Action0 action) {
1234+
return create(OperationFinally.finallyDo(source, action));
1235+
}
1236+
12241237
/**
12251238
* Groups the elements of an observable and selects the resulting elements by using a specified function.
12261239
*
@@ -2491,6 +2504,17 @@ public Observable<T> filter(Func1<T, Boolean> predicate) {
24912504
return filter(this, predicate);
24922505
}
24932506

2507+
/**
2508+
* Registers an action to be called when this observable calls
2509+
* <code>onComplete</code> or <code>onError</code>.
2510+
* @param action an action to be called when this observable completes or errors.
2511+
* @return an Observable that emits the same objects as this observable, then calls the action.
2512+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
2513+
*/
2514+
public Observable<T> finallyDo(Action0 action) {
2515+
return create(OperationFinally.finallyDo(this, action));
2516+
}
2517+
24942518
/**
24952519
* Filters an Observable by discarding any of its emissions that do not meet some test.
24962520
* <p>
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
24+
import rx.Observable;
25+
import rx.Observer;
26+
import rx.Subscription;
27+
import rx.util.AtomicObservableSubscription;
28+
import rx.util.functions.Action0;
29+
import rx.util.functions.Func1;
30+
31+
public final class OperationFinally {
32+
33+
/**
34+
* Call a given action when a sequence completes (with or without an
35+
* exception). The returned observable is exactly as threadsafe as the
36+
* source observable.
37+
* <p/>
38+
* Note that "finally" is a Java reserved word and cannot be an identifier,
39+
* so we use "finallyDo".
40+
*
41+
* @param sequence An observable sequence of elements
42+
* @param action An action to be taken when the sequence is complete or throws an exception
43+
* @return An observable sequence with the same elements as the input.
44+
* After the last element is consumed (and {@link Observer#onCompleted} has been called),
45+
* or after an exception is thrown (and {@link Observer#onError} has been called),
46+
* the given action will be called.
47+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN Observable.Finally method</a>
48+
*/
49+
public static <T> Func1<Observer<T>, Subscription> finallyDo(final Observable<T> sequence, final Action0 action) {
50+
return new Func1<Observer<T>, Subscription>() {
51+
@Override
52+
public Subscription call(Observer<T> observer) {
53+
return new Finally<T>(sequence, action).call(observer);
54+
}
55+
};
56+
}
57+
58+
private static class Finally<T> implements Func1<Observer<T>, Subscription> {
59+
private final Observable<T> sequence;
60+
private final Action0 finalAction;
61+
62+
Finally(final Observable<T> sequence, Action0 finalAction) {
63+
this.sequence = sequence;
64+
this.finalAction = finalAction;
65+
}
66+
67+
public Subscription call(Observer<T> observer) {
68+
return sequence.subscribe(new FinallyObserver(observer));
69+
}
70+
71+
private class FinallyObserver implements Observer<T> {
72+
private final Observer<T> observer;
73+
74+
FinallyObserver(Observer<T> observer) {
75+
this.observer = observer;
76+
}
77+
78+
@Override
79+
public void onCompleted() {
80+
try {
81+
observer.onCompleted();
82+
} finally {
83+
finalAction.call();
84+
}
85+
}
86+
87+
@Override
88+
public void onError(Exception e) {
89+
try {
90+
observer.onError(e);
91+
} finally {
92+
finalAction.call();
93+
}
94+
}
95+
96+
@Override
97+
public void onNext(T args) {
98+
observer.onNext(args);
99+
}
100+
}
101+
}
102+
103+
public static class UnitTest {
104+
private Action0 aAction0;
105+
private Observer<String> aObserver;
106+
@Before
107+
public void before() {
108+
aAction0 = mock(Action0.class);
109+
aObserver = mock(Observer.class);
110+
}
111+
private void checkActionCalled(Observable<String> input) {
112+
Observable.create(finallyDo(input, aAction0)).subscribe(aObserver);
113+
verify(aAction0, times(1)).call();
114+
}
115+
@Test
116+
public void testFinallyCalledOnComplete() {
117+
checkActionCalled(Observable.toObservable(new String[] {"1", "2", "3"}));
118+
}
119+
@Test
120+
public void testFinallyCalledOnError() {
121+
checkActionCalled(Observable.<String>error(new RuntimeException("expected")));
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)