Skip to content

Commit 2a3bb5a

Browse files
Merge pull request #354 from jmhofer/count-sum-average
Count, Sum, Average implementations
2 parents 176280e + cf7f9f7 commit 2a3bb5a

File tree

4 files changed

+491
-2
lines changed

4 files changed

+491
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import rx.observables.ConnectableObservable;
2828
import rx.observables.GroupedObservable;
2929
import rx.operators.OperationAll;
30+
import rx.operators.OperationAverage;
3031
import rx.operators.OperationBuffer;
3132
import rx.operators.OperationCache;
3233
import rx.operators.OperationCombineLatest;
@@ -50,6 +51,7 @@
5051
import rx.operators.OperationScan;
5152
import rx.operators.OperationSkip;
5253
import rx.operators.OperationSubscribeOn;
54+
import rx.operators.OperationSum;
5355
import rx.operators.OperationSwitch;
5456
import rx.operators.OperationSynchronize;
5557
import rx.operators.OperationTake;
@@ -2043,6 +2045,94 @@ public Observable<T> reduce(Func2<? super T, ? super T, ? extends T> accumulator
20432045
return create(OperationScan.scan(this, accumulator)).takeLast(1);
20442046
}
20452047

2048+
/**
2049+
* Returns an Observable that counts the total number of elements in the source Observable.
2050+
* @return an Observable emitting the number of counted elements of the source Observable
2051+
* as its single item.
2052+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229470%28v=vs.103%29.aspx">MSDN: Observable.Count</a>
2053+
*/
2054+
public Observable<Integer> count() {
2055+
return reduce(0, new Func2<Integer, T, Integer>() {
2056+
@Override
2057+
public Integer call(Integer t1, T t2) {
2058+
return t1 + 1;
2059+
}
2060+
});
2061+
}
2062+
2063+
/**
2064+
* Returns an Observable that sums up the elements in the source Observable.
2065+
* @param source
2066+
* Source observable to compute the sum of.
2067+
* @return an Observable emitting the sum of all the elements of the source Observable
2068+
* as its single item.
2069+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN: Observable.Sum</a>
2070+
*/
2071+
public static Observable<Integer> sum(Observable<Integer> source) {
2072+
return OperationSum.sum(source);
2073+
}
2074+
2075+
/**
2076+
* @see #sum(Observable)
2077+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN: Observable.Sum</a>
2078+
*/
2079+
public static Observable<Long> sumLongs(Observable<Long> source) {
2080+
return OperationSum.sumLongs(source);
2081+
}
2082+
2083+
/**
2084+
* @see #sum(Observable)
2085+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN: Observable.Sum</a>
2086+
*/
2087+
public static Observable<Float> sumFloats(Observable<Float> source) {
2088+
return OperationSum.sumFloats(source);
2089+
}
2090+
2091+
/**
2092+
* @see #sum(Observable)
2093+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.sum%28v=vs.103%29.aspx">MSDN: Observable.Sum</a>
2094+
*/
2095+
public static Observable<Double> sumDoubles(Observable<Double> source) {
2096+
return OperationSum.sumDoubles(source);
2097+
}
2098+
2099+
/**
2100+
* Returns an Observable that computes the average of all elements in the source Observable.
2101+
* For an empty source, it causes an ArithmeticException.
2102+
* @param source
2103+
* Source observable to compute the average of.
2104+
* @return an Observable emitting the averageof all the elements of the source Observable
2105+
* as its single item.
2106+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average%28v=vs.103%29.aspx">MSDN: Observable.Average</a>
2107+
*/
2108+
public static Observable<Integer> average(Observable<Integer> source) {
2109+
return OperationAverage.average(source);
2110+
}
2111+
2112+
/**
2113+
* @see #average(Observable)
2114+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average%28v=vs.103%29.aspx">MSDN: Observable.Average</a>
2115+
*/
2116+
public static Observable<Long> averageLongs(Observable<Long> source) {
2117+
return OperationAverage.averageLongs(source);
2118+
}
2119+
2120+
/**
2121+
* @see #average(Observable)
2122+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average%28v=vs.103%29.aspx">MSDN: Observable.Average</a>
2123+
*/
2124+
public static Observable<Float> averageFloats(Observable<Float> source) {
2125+
return OperationAverage.averageFloats(source);
2126+
}
2127+
2128+
/**
2129+
* @see #average(Observable)
2130+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average%28v=vs.103%29.aspx">MSDN: Observable.Average</a>
2131+
*/
2132+
public static Observable<Double> averageDoubles(Observable<Double> source) {
2133+
return OperationAverage.averageDoubles(source);
2134+
}
2135+
20462136
/**
20472137
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying
20482138
* Observable that will replay all of its items and notifications to any future {@link Observer}.
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Mockito.*;
19+
20+
import org.junit.Test;
21+
22+
import rx.Observable;
23+
import rx.Observer;
24+
import rx.util.functions.Func1;
25+
import rx.util.functions.Func2;
26+
27+
/**
28+
* A few operators for implementing the averaging operation.
29+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.average%28v=vs.103%29.aspx">MSDN: Observable.Average</a>
30+
*/
31+
public final class OperationAverage {
32+
private static final class Tuple2<T> {
33+
private final T current;
34+
private final Integer count;
35+
36+
private Tuple2(T v1, Integer v2) {
37+
current = v1;
38+
count = v2;
39+
}
40+
}
41+
42+
public static Observable<Integer> average(Observable<Integer> source) {
43+
return source.reduce(new Tuple2<Integer>(0, 0), new Func2<Tuple2<Integer>, Integer, Tuple2<Integer>>() {
44+
@Override
45+
public Tuple2<Integer> call(Tuple2<Integer> accu, Integer next) {
46+
return new Tuple2<Integer>(accu.current + next, accu.count + 1);
47+
}
48+
}).map(new Func1<Tuple2<Integer>, Integer>() {
49+
@Override
50+
public Integer call(Tuple2<Integer> result) {
51+
return result.current / result.count; // may throw DivisionByZero, this should be correct...
52+
}
53+
});
54+
}
55+
56+
public static Observable<Long> averageLongs(Observable<Long> source) {
57+
return source.reduce(new Tuple2<Long>(0L, 0), new Func2<Tuple2<Long>, Long, Tuple2<Long>>() {
58+
@Override
59+
public Tuple2<Long> call(Tuple2<Long> accu, Long next) {
60+
return new Tuple2<Long>(accu.current + next, accu.count + 1);
61+
}
62+
}).map(new Func1<Tuple2<Long>, Long>() {
63+
@Override
64+
public Long call(Tuple2<Long> result) {
65+
return result.current / result.count; // may throw DivisionByZero, this should be correct...
66+
}
67+
});
68+
}
69+
70+
public static Observable<Float> averageFloats(Observable<Float> source) {
71+
return source.reduce(new Tuple2<Float>(0.0f, 0), new Func2<Tuple2<Float>, Float, Tuple2<Float>>() {
72+
@Override
73+
public Tuple2<Float> call(Tuple2<Float> accu, Float next) {
74+
return new Tuple2<Float>(accu.current + next, accu.count + 1);
75+
}
76+
}).map(new Func1<Tuple2<Float>, Float>() {
77+
@Override
78+
public Float call(Tuple2<Float> result) {
79+
if (result.count == 0) {
80+
throw new ArithmeticException("divide by zero");
81+
}
82+
return result.current / result.count;
83+
}
84+
});
85+
}
86+
87+
public static Observable<Double> averageDoubles(Observable<Double> source) {
88+
return source.reduce(new Tuple2<Double>(0.0d, 0), new Func2<Tuple2<Double>, Double, Tuple2<Double>>() {
89+
@Override
90+
public Tuple2<Double> call(Tuple2<Double> accu, Double next) {
91+
return new Tuple2<Double>(accu.current + next, accu.count + 1);
92+
}
93+
}).map(new Func1<Tuple2<Double>, Double>() {
94+
@Override
95+
public Double call(Tuple2<Double> result) {
96+
if (result.count == 0) {
97+
throw new ArithmeticException("divide by zero");
98+
}
99+
return result.current / result.count;
100+
}
101+
});
102+
}
103+
104+
public static class UnitTest {
105+
@SuppressWarnings("unchecked")
106+
Observer<Integer> w = mock(Observer.class);
107+
@SuppressWarnings("unchecked")
108+
Observer<Long> wl = mock(Observer.class);
109+
@SuppressWarnings("unchecked")
110+
Observer<Float> wf = mock(Observer.class);
111+
@SuppressWarnings("unchecked")
112+
Observer<Double> wd = mock(Observer.class);
113+
114+
@Test
115+
public void testAverageOfAFewInts() throws Throwable {
116+
Observable<Integer> src = Observable.from(1, 2, 3, 4, 6);
117+
average(src).subscribe(w);
118+
119+
verify(w, times(1)).onNext(anyInt());
120+
verify(w).onNext(3);
121+
verify(w, never()).onError(any(Throwable.class));
122+
verify(w, times(1)).onCompleted();
123+
}
124+
125+
@Test
126+
public void testEmptyAverage() throws Throwable {
127+
Observable<Integer> src = Observable.from();
128+
average(src).subscribe(w);
129+
130+
verify(w, never()).onNext(anyInt());
131+
verify(w, times(1)).onError(any(ArithmeticException.class));
132+
verify(w, never()).onCompleted();
133+
}
134+
135+
@Test
136+
public void testAverageOfAFewLongs() throws Throwable {
137+
Observable<Long> src = Observable.from(1L, 2L, 3L, 4L, 6L);
138+
averageLongs(src).subscribe(wl);
139+
140+
verify(wl, times(1)).onNext(anyLong());
141+
verify(wl).onNext(3L);
142+
verify(wl, never()).onError(any(Throwable.class));
143+
verify(wl, times(1)).onCompleted();
144+
}
145+
146+
@Test
147+
public void testEmptyAverageLongs() throws Throwable {
148+
Observable<Long> src = Observable.from();
149+
averageLongs(src).subscribe(wl);
150+
151+
verify(wl, never()).onNext(anyLong());
152+
verify(wl, times(1)).onError(any(ArithmeticException.class));
153+
verify(wl, never()).onCompleted();
154+
}
155+
156+
@Test
157+
public void testAverageOfAFewFloats() throws Throwable {
158+
Observable<Float> src = Observable.from(1.0f, 2.0f);
159+
averageFloats(src).subscribe(wf);
160+
161+
verify(wf, times(1)).onNext(anyFloat());
162+
verify(wf).onNext(1.5f);
163+
verify(wf, never()).onError(any(Throwable.class));
164+
verify(wf, times(1)).onCompleted();
165+
}
166+
167+
@Test
168+
public void testEmptyAverageFloats() throws Throwable {
169+
Observable<Float> src = Observable.from();
170+
averageFloats(src).subscribe(wf);
171+
172+
verify(wf, never()).onNext(anyFloat());
173+
verify(wf, times(1)).onError(any(ArithmeticException.class));
174+
verify(wf, never()).onCompleted();
175+
}
176+
177+
@Test
178+
public void testAverageOfAFewDoubles() throws Throwable {
179+
Observable<Double> src = Observable.from(1.0d, 2.0d);
180+
averageDoubles(src).subscribe(wd);
181+
182+
verify(wd, times(1)).onNext(anyDouble());
183+
verify(wd).onNext(1.5d);
184+
verify(wd, never()).onError(any(Throwable.class));
185+
verify(wd, times(1)).onCompleted();
186+
}
187+
188+
@Test
189+
public void testEmptyAverageDoubles() throws Throwable {
190+
Observable<Double> src = Observable.from();
191+
averageDoubles(src).subscribe(wd);
192+
193+
verify(wd, never()).onNext(anyDouble());
194+
verify(wd, times(1)).onError(any(ArithmeticException.class));
195+
verify(wd, never()).onCompleted();
196+
}
197+
}
198+
}

0 commit comments

Comments
 (0)