Skip to content

Commit 639dc98

Browse files
committed
Use the '+1/-1' way to implement the min and max operators
1 parent e12ede1 commit 639dc98

File tree

6 files changed

+431
-498
lines changed

6 files changed

+431
-498
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,9 @@
5353
import rx.operators.OperationLast;
5454
import rx.operators.OperationMap;
5555
import rx.operators.OperationMaterialize;
56-
import rx.operators.OperationMax;
5756
import rx.operators.OperationMerge;
5857
import rx.operators.OperationMergeDelayError;
59-
import rx.operators.OperationMin;
58+
import rx.operators.OperationMinMax;
6059
import rx.operators.OperationMulticast;
6160
import rx.operators.OperationObserveOn;
6261
import rx.operators.OperationOnErrorResumeNextViaFunction;
@@ -3636,6 +3635,7 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
36363635

36373636
/**
36383637
* Returns the minimum element in an observable sequence.
3638+
* If there are more than one minimum elements, returns the last one.
36393639
* For an empty source, it causes an {@link IllegalArgumentException}.
36403640
*
36413641
* @param source
@@ -3646,11 +3646,12 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
36463646
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229715(v=vs.103).aspx">MSDN: Observable.Min</a>
36473647
*/
36483648
public static <T extends Comparable<T>> Observable<T> min(Observable<T> source) {
3649-
return OperationMin.min(source);
3649+
return OperationMinMax.min(source);
36503650
}
36513651

36523652
/**
36533653
* Returns the minimum element in an observable sequence according to the specified comparator.
3654+
* If there are more than one minimum elements, returns the last one.
36543655
* For an empty source, it causes an {@link IllegalArgumentException}.
36553656
*
36563657
* @param comparator
@@ -3661,7 +3662,7 @@ public static <T extends Comparable<T>> Observable<T> min(Observable<T> source)
36613662
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229095(v=vs.103).aspx">MSDN: Observable.Min</a>
36623663
*/
36633664
public Observable<T> min(Comparator<T> comparator) {
3664-
return OperationMin.min(this, comparator);
3665+
return OperationMinMax.min(this, comparator);
36653666
}
36663667

36673668
/**
@@ -3674,7 +3675,7 @@ public Observable<T> min(Comparator<T> comparator) {
36743675
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228970(v=vs.103).aspx">MSDN: Observable.MinBy</a>
36753676
*/
36763677
public <R extends Comparable<R>> Observable<List<T>> minBy(Func1<T, R> selector) {
3677-
return OperationMin.minBy(this, selector);
3678+
return OperationMinMax.minBy(this, selector);
36783679
}
36793680

36803681
/**
@@ -3689,11 +3690,12 @@ public <R extends Comparable<R>> Observable<List<T>> minBy(Func1<T, R> selector)
36893690
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228970(v=vs.103).aspx">MSDN: Observable.MinBy</a>
36903691
*/
36913692
public <R> Observable<List<T>> minBy(Func1<T, R> selector, Comparator<R> comparator) {
3692-
return OperationMin.minBy(this, selector, comparator);
3693+
return OperationMinMax.minBy(this, selector, comparator);
36933694
}
36943695

36953696
/**
36963697
* Returns the maximum element in an observable sequence.
3698+
* If there are more than one maximum elements, returns the last one.
36973699
* For an empty source, it causes an {@link IllegalArgumentException}.
36983700
*
36993701
* @param source
@@ -3704,11 +3706,12 @@ public <R> Observable<List<T>> minBy(Func1<T, R> selector, Comparator<R> compara
37043706
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211837(v=vs.103).aspx">MSDN: Observable.Max</a>
37053707
*/
37063708
public static <T extends Comparable<T>> Observable<T> max(Observable<T> source) {
3707-
return OperationMax.max(source);
3709+
return OperationMinMax.max(source);
37083710
}
37093711

37103712
/**
37113713
* Returns the maximum element in an observable sequence according to the specified comparator.
3714+
* If there are more than one maximum elements, returns the last one.
37123715
* For an empty source, it causes an {@link IllegalArgumentException}.
37133716
*
37143717
* @param comparator
@@ -3719,7 +3722,7 @@ public static <T extends Comparable<T>> Observable<T> max(Observable<T> source)
37193722
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211635(v=vs.103).aspx">MSDN: Observable.Max</a>
37203723
*/
37213724
public Observable<T> max(Comparator<T> comparator) {
3722-
return OperationMax.max(this, comparator);
3725+
return OperationMinMax.max(this, comparator);
37233726
}
37243727

37253728
/**
@@ -3732,7 +3735,7 @@ public Observable<T> max(Comparator<T> comparator) {
37323735
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229058(v=vs.103).aspx">MSDN: Observable.MaxBy</a>
37333736
*/
37343737
public <R extends Comparable<R>> Observable<List<T>> maxBy(Func1<T, R> selector) {
3735-
return OperationMax.maxBy(this, selector);
3738+
return OperationMinMax.maxBy(this, selector);
37363739
}
37373740

37383741
/**
@@ -3747,7 +3750,7 @@ public <R extends Comparable<R>> Observable<List<T>> maxBy(Func1<T, R> selector)
37473750
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244330(v=vs.103).aspx">MSDN: Observable.MaxBy</a>
37483751
*/
37493752
public <R> Observable<List<T>> maxBy(Func1<T, R> selector, Comparator<R> comparator) {
3750-
return OperationMax.maxBy(this, selector, comparator);
3753+
return OperationMinMax.maxBy(this, selector, comparator);
37513754
}
37523755

37533756
/**

rxjava-core/src/main/java/rx/operators/OperationMax.java

Lines changed: 0 additions & 106 deletions
This file was deleted.

rxjava-core/src/main/java/rx/operators/OperationMin.java renamed to rxjava-core/src/main/java/rx/operators/OperationMinMax.java

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,36 +26,76 @@
2626
/**
2727
* Returns the minimum element in an observable sequence.
2828
*/
29-
public class OperationMin {
29+
public class OperationMinMax {
3030

3131
public static <T extends Comparable<T>> Observable<T> min(
3232
Observable<T> source) {
33+
return minMax(source, -1L);
34+
}
35+
36+
public static <T> Observable<T> min(Observable<T> source,
37+
final Comparator<T> comparator) {
38+
return minMax(source, comparator, -1L);
39+
}
40+
41+
public static <T, R extends Comparable<R>> Observable<List<T>> minBy(
42+
Observable<T> source, final Func1<T, R> selector) {
43+
return minMaxBy(source, selector, -1L);
44+
}
45+
46+
public static <T, R> Observable<List<T>> minBy(Observable<T> source,
47+
final Func1<T, R> selector, final Comparator<R> comparator) {
48+
return minMaxBy(source, selector, comparator, -1L);
49+
}
50+
51+
public static <T extends Comparable<T>> Observable<T> max(
52+
Observable<T> source) {
53+
return minMax(source, 1L);
54+
}
55+
56+
public static <T> Observable<T> max(Observable<T> source,
57+
final Comparator<T> comparator) {
58+
return minMax(source, comparator, 1L);
59+
}
60+
61+
public static <T, R extends Comparable<R>> Observable<List<T>> maxBy(
62+
Observable<T> source, final Func1<T, R> selector) {
63+
return minMaxBy(source, selector, 1L);
64+
}
65+
66+
public static <T, R> Observable<List<T>> maxBy(Observable<T> source,
67+
final Func1<T, R> selector, final Comparator<R> comparator) {
68+
return minMaxBy(source, selector, comparator, 1L);
69+
}
70+
71+
private static <T extends Comparable<T>> Observable<T> minMax(
72+
Observable<T> source, final long flag) {
3373
return source.reduce(new Func2<T, T, T>() {
3474
@Override
3575
public T call(T acc, T value) {
36-
if (acc.compareTo(value) < 0) {
76+
if (flag * acc.compareTo(value) > 0) {
3777
return acc;
3878
}
3979
return value;
4080
}
4181
});
4282
}
4383

44-
public static <T> Observable<T> min(Observable<T> source,
45-
final Comparator<T> comparator) {
84+
private static <T> Observable<T> minMax(Observable<T> source,
85+
final Comparator<T> comparator, final long flag) {
4686
return source.reduce(new Func2<T, T, T>() {
4787
@Override
4888
public T call(T acc, T value) {
49-
if (comparator.compare(acc, value) < 0) {
89+
if (flag * comparator.compare(acc, value) > 0) {
5090
return acc;
5191
}
5292
return value;
5393
}
5494
});
5595
}
5696

57-
public static <T, R extends Comparable<R>> Observable<List<T>> minBy(
58-
Observable<T> source, final Func1<T, R> selector) {
97+
private static <T, R extends Comparable<R>> Observable<List<T>> minMaxBy(
98+
Observable<T> source, final Func1<T, R> selector, final long flag) {
5999
return source.reduce(new ArrayList<T>(),
60100
new Func2<List<T>, T, List<T>>() {
61101

@@ -64,12 +104,12 @@ public List<T> call(List<T> acc, T value) {
64104
if (acc.isEmpty()) {
65105
acc.add(value);
66106
} else {
67-
int flag = selector.call(acc.get(0)).compareTo(
68-
selector.call(value));
69-
if (flag > 0) {
70-
acc.clear();
107+
int compareResult = selector.call(acc.get(0))
108+
.compareTo(selector.call(value));
109+
if (compareResult == 0) {
71110
acc.add(value);
72-
} else if (flag == 0) {
111+
} else if (flag * compareResult < 0) {
112+
acc.clear();
73113
acc.add(value);
74114
}
75115
}
@@ -78,8 +118,9 @@ public List<T> call(List<T> acc, T value) {
78118
});
79119
}
80120

81-
public static <T, R> Observable<List<T>> minBy(Observable<T> source,
82-
final Func1<T, R> selector, final Comparator<R> comparator) {
121+
private static <T, R> Observable<List<T>> minMaxBy(Observable<T> source,
122+
final Func1<T, R> selector, final Comparator<R> comparator,
123+
final long flag) {
83124
return source.reduce(new ArrayList<T>(),
84125
new Func2<List<T>, T, List<T>>() {
85126

@@ -88,13 +129,13 @@ public List<T> call(List<T> acc, T value) {
88129
if (acc.isEmpty()) {
89130
acc.add(value);
90131
} else {
91-
int flag = comparator.compare(
132+
int compareResult = comparator.compare(
92133
selector.call(acc.get(0)),
93134
selector.call(value));
94-
if (flag > 0) {
95-
acc.clear();
135+
if (compareResult == 0) {
96136
acc.add(value);
97-
} else if (flag == 0) {
137+
} else if (flag * compareResult < 0) {
138+
acc.clear();
98139
acc.add(value);
99140
}
100141
}

0 commit comments

Comments
 (0)