Skip to content

Commit 86053d2

Browse files
Merge pull request ReactiveX#478 from zsxwing/min-max
Implemented the "Operator: Min and MinBy" and "Operator: Max and MaxBy"
2 parents 8eee5ae + 639dc98 commit 86053d2

File tree

3 files changed

+628
-0
lines changed

3 files changed

+628
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.Arrays;
22+
import java.util.Comparator;
2223
import java.util.List;
2324
import java.util.concurrent.ConcurrentHashMap;
2425
import java.util.concurrent.Future;
@@ -54,6 +55,7 @@
5455
import rx.operators.OperationMaterialize;
5556
import rx.operators.OperationMerge;
5657
import rx.operators.OperationMergeDelayError;
58+
import rx.operators.OperationMinMax;
5759
import rx.operators.OperationMulticast;
5860
import rx.operators.OperationObserveOn;
5961
import rx.operators.OperationOnErrorResumeNextViaFunction;
@@ -3631,6 +3633,126 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
36313633
return OperationAverage.averageDoubles(source);
36323634
}
36333635

3636+
/**
3637+
* Returns the minimum element in an observable sequence.
3638+
* If there are more than one minimum elements, returns the last one.
3639+
* For an empty source, it causes an {@link IllegalArgumentException}.
3640+
*
3641+
* @param source
3642+
* an observable sequence to determine the minimum element of.
3643+
* @return an observable emitting the minimum element.
3644+
* @throws IllegalArgumentException
3645+
* if the source is empty
3646+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229715(v=vs.103).aspx">MSDN: Observable.Min</a>
3647+
*/
3648+
public static <T extends Comparable<T>> Observable<T> min(Observable<T> source) {
3649+
return OperationMinMax.min(source);
3650+
}
3651+
3652+
/**
3653+
* Returns the minimum element in an observable sequence according to the specified comparator.
3654+
* If there are more than one minimum elements, returns the last one.
3655+
* For an empty source, it causes an {@link IllegalArgumentException}.
3656+
*
3657+
* @param comparator
3658+
* the comparer used to compare elements.
3659+
* @return an observable emitting the minimum value according to the specified comparator.
3660+
* @throws IllegalArgumentException
3661+
* if the source is empty
3662+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229095(v=vs.103).aspx">MSDN: Observable.Min</a>
3663+
*/
3664+
public Observable<T> min(Comparator<T> comparator) {
3665+
return OperationMinMax.min(this, comparator);
3666+
}
3667+
3668+
/**
3669+
* Returns the elements in an observable sequence with the minimum key value.
3670+
* For an empty source, it returns an observable emitting an empty List.
3671+
*
3672+
* @param selector
3673+
* the key selector function.
3674+
* @return an observable emitting a List of the elements with the minimum key value.
3675+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228970(v=vs.103).aspx">MSDN: Observable.MinBy</a>
3676+
*/
3677+
public <R extends Comparable<R>> Observable<List<T>> minBy(Func1<T, R> selector) {
3678+
return OperationMinMax.minBy(this, selector);
3679+
}
3680+
3681+
/**
3682+
* Returns the elements in an observable sequence with the minimum key value according to the specified comparator.
3683+
* For an empty source, it returns an observable emitting an empty List.
3684+
*
3685+
* @param selector
3686+
* the key selector function.
3687+
* @param comparator
3688+
* the comparator used to compare key values.
3689+
* @return an observable emitting a List of the elements with the minimum key value according to the specified comparator.
3690+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228970(v=vs.103).aspx">MSDN: Observable.MinBy</a>
3691+
*/
3692+
public <R> Observable<List<T>> minBy(Func1<T, R> selector, Comparator<R> comparator) {
3693+
return OperationMinMax.minBy(this, selector, comparator);
3694+
}
3695+
3696+
/**
3697+
* Returns the maximum element in an observable sequence.
3698+
* If there are more than one maximum elements, returns the last one.
3699+
* For an empty source, it causes an {@link IllegalArgumentException}.
3700+
*
3701+
* @param source
3702+
* an observable sequence to determine the maximum element of.
3703+
* @return an observable emitting the maximum element.
3704+
* @throws IllegalArgumentException
3705+
* if the source is empty.
3706+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211837(v=vs.103).aspx">MSDN: Observable.Max</a>
3707+
*/
3708+
public static <T extends Comparable<T>> Observable<T> max(Observable<T> source) {
3709+
return OperationMinMax.max(source);
3710+
}
3711+
3712+
/**
3713+
* Returns the maximum element in an observable sequence according to the specified comparator.
3714+
* If there are more than one maximum elements, returns the last one.
3715+
* For an empty source, it causes an {@link IllegalArgumentException}.
3716+
*
3717+
* @param comparator
3718+
* the comparer used to compare elements.
3719+
* @return an observable emitting the maximum value according to the specified comparator.
3720+
* @throws IllegalArgumentException
3721+
* if the source is empty.
3722+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211635(v=vs.103).aspx">MSDN: Observable.Max</a>
3723+
*/
3724+
public Observable<T> max(Comparator<T> comparator) {
3725+
return OperationMinMax.max(this, comparator);
3726+
}
3727+
3728+
/**
3729+
* Returns the elements in an observable sequence with the maximum key value.
3730+
* For an empty source, it returns an observable emitting an empty List.
3731+
*
3732+
* @param selector
3733+
* the key selector function.
3734+
* @return an observable emitting a List of the elements with the maximum key value.
3735+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229058(v=vs.103).aspx">MSDN: Observable.MaxBy</a>
3736+
*/
3737+
public <R extends Comparable<R>> Observable<List<T>> maxBy(Func1<T, R> selector) {
3738+
return OperationMinMax.maxBy(this, selector);
3739+
}
3740+
3741+
/**
3742+
* Returns the elements in an observable sequence with the maximum key value according to the specified comparator.
3743+
* For an empty source, it returns an observable emitting an empty List.
3744+
*
3745+
* @param selector
3746+
* the key selector function.
3747+
* @param comparator
3748+
* the comparator used to compare key values.
3749+
* @return an observable emitting a List of the elements with the maximum key value according to the specified comparator.
3750+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244330(v=vs.103).aspx">MSDN: Observable.MaxBy</a>
3751+
*/
3752+
public <R> Observable<List<T>> maxBy(Func1<T, R> selector, Comparator<R> comparator) {
3753+
return OperationMinMax.maxBy(this, selector, comparator);
3754+
}
3755+
36343756
/**
36353757
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying
36363758
* Observable that will replay all of its items and notifications to any future {@link Observer}.
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.ArrayList;
19+
import java.util.Comparator;
20+
import java.util.List;
21+
22+
import rx.Observable;
23+
import rx.util.functions.Func1;
24+
import rx.util.functions.Func2;
25+
26+
/**
27+
* Returns the minimum element in an observable sequence.
28+
*/
29+
public class OperationMinMax {
30+
31+
public static <T extends Comparable<T>> Observable<T> min(
32+
Observable<T> source) {
33+
return minMax(source, -1L);
34+
}
35+
36+
public static <T> Observable<T> min(Observable<T> source,
37+
final Comparator<T> comparator) {
38+
return minMax(source, comparator, -1L);
39+
}
40+
41+
public static <T, R extends Comparable<R>> Observable<List<T>> minBy(
42+
Observable<T> source, final Func1<T, R> selector) {
43+
return minMaxBy(source, selector, -1L);
44+
}
45+
46+
public static <T, R> Observable<List<T>> minBy(Observable<T> source,
47+
final Func1<T, R> selector, final Comparator<R> comparator) {
48+
return minMaxBy(source, selector, comparator, -1L);
49+
}
50+
51+
public static <T extends Comparable<T>> Observable<T> max(
52+
Observable<T> source) {
53+
return minMax(source, 1L);
54+
}
55+
56+
public static <T> Observable<T> max(Observable<T> source,
57+
final Comparator<T> comparator) {
58+
return minMax(source, comparator, 1L);
59+
}
60+
61+
public static <T, R extends Comparable<R>> Observable<List<T>> maxBy(
62+
Observable<T> source, final Func1<T, R> selector) {
63+
return minMaxBy(source, selector, 1L);
64+
}
65+
66+
public static <T, R> Observable<List<T>> maxBy(Observable<T> source,
67+
final Func1<T, R> selector, final Comparator<R> comparator) {
68+
return minMaxBy(source, selector, comparator, 1L);
69+
}
70+
71+
private static <T extends Comparable<T>> Observable<T> minMax(
72+
Observable<T> source, final long flag) {
73+
return source.reduce(new Func2<T, T, T>() {
74+
@Override
75+
public T call(T acc, T value) {
76+
if (flag * acc.compareTo(value) > 0) {
77+
return acc;
78+
}
79+
return value;
80+
}
81+
});
82+
}
83+
84+
private static <T> Observable<T> minMax(Observable<T> source,
85+
final Comparator<T> comparator, final long flag) {
86+
return source.reduce(new Func2<T, T, T>() {
87+
@Override
88+
public T call(T acc, T value) {
89+
if (flag * comparator.compare(acc, value) > 0) {
90+
return acc;
91+
}
92+
return value;
93+
}
94+
});
95+
}
96+
97+
private static <T, R extends Comparable<R>> Observable<List<T>> minMaxBy(
98+
Observable<T> source, final Func1<T, R> selector, final long flag) {
99+
return source.reduce(new ArrayList<T>(),
100+
new Func2<List<T>, T, List<T>>() {
101+
102+
@Override
103+
public List<T> call(List<T> acc, T value) {
104+
if (acc.isEmpty()) {
105+
acc.add(value);
106+
} else {
107+
int compareResult = selector.call(acc.get(0))
108+
.compareTo(selector.call(value));
109+
if (compareResult == 0) {
110+
acc.add(value);
111+
} else if (flag * compareResult < 0) {
112+
acc.clear();
113+
acc.add(value);
114+
}
115+
}
116+
return acc;
117+
}
118+
});
119+
}
120+
121+
private static <T, R> Observable<List<T>> minMaxBy(Observable<T> source,
122+
final Func1<T, R> selector, final Comparator<R> comparator,
123+
final long flag) {
124+
return source.reduce(new ArrayList<T>(),
125+
new Func2<List<T>, T, List<T>>() {
126+
127+
@Override
128+
public List<T> call(List<T> acc, T value) {
129+
if (acc.isEmpty()) {
130+
acc.add(value);
131+
} else {
132+
int compareResult = comparator.compare(
133+
selector.call(acc.get(0)),
134+
selector.call(value));
135+
if (compareResult == 0) {
136+
acc.add(value);
137+
} else if (flag * compareResult < 0) {
138+
acc.clear();
139+
acc.add(value);
140+
}
141+
}
142+
return acc;
143+
}
144+
});
145+
}
146+
147+
}

0 commit comments

Comments
 (0)