Skip to content

Commit f1399e6

Browse files
committed
Implemented all operation
1 parent f4968d6 commit f1399e6

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package rx.operators;
2+
3+
import org.junit.Test;
4+
import rx.Observable;
5+
import rx.Observer;
6+
import rx.Subscription;
7+
import rx.util.AtomicObservableSubscription;
8+
import rx.util.functions.Func1;
9+
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.verify;
14+
import static org.mockito.Mockito.verifyNoMoreInteractions;
15+
16+
public class OperationAll {
17+
18+
public static <T> Func1<Observer<Boolean>, Subscription> all(Observable<T> sequence, Func1<T, Boolean> predicate) {
19+
return new AllObservable<T>(sequence, predicate);
20+
}
21+
22+
private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscription> {
23+
private final Observable<T> sequence;
24+
private final Func1<T, Boolean> predicate;
25+
26+
private final AtomicBoolean status = new AtomicBoolean(true);
27+
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
28+
29+
30+
private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {
31+
this.sequence = sequence;
32+
this.predicate = predicate;
33+
}
34+
35+
36+
@Override
37+
public Subscription call(final Observer<Boolean> observer) {
38+
return subscription.wrap(sequence.subscribe(new Observer<T>() {
39+
@Override
40+
public void onCompleted() {
41+
if (status.get()) {
42+
observer.onNext(true);
43+
observer.onCompleted();
44+
}
45+
}
46+
47+
@Override
48+
public void onError(Exception e) {
49+
observer.onError(e);
50+
}
51+
52+
@Override
53+
public void onNext(T args) {
54+
boolean result = predicate.call(args);
55+
boolean changed = status.compareAndSet(true, result);
56+
57+
if (changed && !result) {
58+
observer.onNext(false);
59+
observer.onCompleted();
60+
subscription.unsubscribe();
61+
}
62+
}
63+
}));
64+
}
65+
}
66+
67+
public static class UnitTest {
68+
69+
@Test
70+
@SuppressWarnings("unchecked")
71+
public void testAll() {
72+
Observable<String> obs = Observable.from("one", "two", "six");
73+
74+
Observer<Boolean> observer = mock(Observer.class);
75+
Observable.create(all(obs, new Func1<String, Boolean>() {
76+
@Override
77+
public Boolean call(String s) {
78+
return s.length() == 3;
79+
}
80+
})).subscribe(observer);
81+
82+
verify(observer).onNext(true);
83+
verify(observer).onCompleted();
84+
verifyNoMoreInteractions(observer);
85+
}
86+
87+
@Test
88+
@SuppressWarnings("unchecked")
89+
public void testNotAll() {
90+
Observable<String> obs = Observable.from("one", "two", "three", "six");
91+
92+
Observer<Boolean> observer = mock(Observer.class);
93+
Observable.create(all(obs, new Func1<String, Boolean>() {
94+
@Override
95+
public Boolean call(String s) {
96+
return s.length() == 3;
97+
}
98+
})).subscribe(observer);
99+
100+
verify(observer).onNext(false);
101+
verify(observer).onCompleted();
102+
verifyNoMoreInteractions(observer);
103+
}
104+
105+
@Test
106+
@SuppressWarnings("unchecked")
107+
public void testEmpty() {
108+
Observable<String> obs = Observable.empty();
109+
110+
Observer<Boolean> observer = mock(Observer.class);
111+
Observable.create(all(obs, new Func1<String, Boolean>() {
112+
@Override
113+
public Boolean call(String s) {
114+
return s.length() == 3;
115+
}
116+
})).subscribe(observer);
117+
118+
verify(observer).onNext(true);
119+
verify(observer).onCompleted();
120+
verifyNoMoreInteractions(observer);
121+
}
122+
123+
@Test
124+
@SuppressWarnings("unchecked")
125+
public void testError() {
126+
Exception error = new Exception();
127+
Observable<String> obs = Observable.error(error);
128+
129+
Observer<Boolean> observer = mock(Observer.class);
130+
Observable.create(all(obs, new Func1<String, Boolean>() {
131+
@Override
132+
public Boolean call(String s) {
133+
return s.length() == 3;
134+
}
135+
})).subscribe(observer);
136+
137+
verify(observer).onError(error);
138+
verifyNoMoreInteractions(observer);
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)