Skip to content

Commit 342d90c

Browse files
Merge pull request #847 from benjchristensen/groupby-and-friends
Various Changes While Fixing GroupBy
2 parents 0ef2737 + 0ab38b4 commit 342d90c

17 files changed

+1061
-430
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import rx.observables.ConnectableObservable;
3434
import rx.observables.GroupedObservable;
3535
import rx.observers.SafeSubscriber;
36+
import rx.operators.OnSubscribeFromIterable;
3637
import rx.operators.OnSubscribeRange;
3738
import rx.operators.OperationAll;
3839
import rx.operators.OperationAmb;
@@ -51,7 +52,6 @@
5152
import rx.operators.OperationDistinct;
5253
import rx.operators.OperationDistinctUntilChanged;
5354
import rx.operators.OperationElementAt;
54-
import rx.operators.OperationFilter;
5555
import rx.operators.OperationFinally;
5656
import rx.operators.OperationFlatMap;
5757
import rx.operators.OperationGroupByUntil;
@@ -96,10 +96,11 @@
9696
import rx.operators.OperationWindow;
9797
import rx.operators.OperatorCast;
9898
import rx.operators.OperatorDoOnEach;
99-
import rx.operators.OnSubscribeFromIterable;
99+
import rx.operators.OperatorFilter;
100100
import rx.operators.OperatorGroupBy;
101101
import rx.operators.OperatorMap;
102102
import rx.operators.OperatorMerge;
103+
import rx.operators.OperationMergeMaxConcurrent;
103104
import rx.operators.OperatorObserveOn;
104105
import rx.operators.OperatorParallel;
105106
import rx.operators.OperatorRepeat;
@@ -1791,7 +1792,7 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
17911792
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211914.aspx">MSDN: Observable.Merge</a>
17921793
*/
17931794
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
1794-
return source.lift(new OperatorMerge(maxConcurrent)); // any idea how to get these generics working?!
1795+
return Observable.create(OperationMergeMaxConcurrent.merge(source, maxConcurrent));
17951796
}
17961797

17971798
/**
@@ -2361,7 +2362,7 @@ public final static <T extends Comparable<? super T>> Observable<T> min(Observab
23612362
*
23622363
* @return
23632364
*/
2364-
private final Observable<Observable<T>> nest() {
2365+
public final Observable<Observable<T>> nest() {
23652366
return from(this);
23662367
}
23672368

@@ -2439,8 +2440,8 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
24392440
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229460.aspx">MSDN: Observable.Range</a>
24402441
*/
24412442
public final static Observable<Integer> range(int start, int count) {
2442-
if (count < 1) {
2443-
throw new IllegalArgumentException("Count must be positive");
2443+
if (count < 0) {
2444+
throw new IllegalArgumentException("Count can not be negative");
24442445
}
24452446
if ((start + count) > Integer.MAX_VALUE) {
24462447
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
@@ -4440,7 +4441,7 @@ public final Observable<Boolean> exists(Func1<? super T, Boolean> predicate) {
44404441
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-filter-or-where">RxJava Wiki: filter()</a>
44414442
*/
44424443
public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
4443-
return create(OperationFilter.filter(this, predicate));
4444+
return lift(new OperatorFilter<T>(predicate));
44444445
}
44454446

44464447
/**

rxjava-core/src/main/java/rx/observers/TestSubscriber.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package rx.observers;
1717

1818
import java.util.List;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
1921

2022
import rx.Notification;
2123
import rx.Observer;
@@ -27,22 +29,25 @@
2729
public class TestSubscriber<T> extends Subscriber<T> {
2830

2931
private final TestObserver<T> testObserver;
32+
private final CountDownLatch latch = new CountDownLatch(1);
33+
private volatile Thread lastSeenThread;
3034

3135
public TestSubscriber(Subscriber<T> delegate) {
3236
this.testObserver = new TestObserver<T>(delegate);
3337
}
34-
38+
3539
public TestSubscriber(Observer<T> delegate) {
3640
this.testObserver = new TestObserver<T>(delegate);
3741
}
3842

3943
public TestSubscriber() {
40-
this.testObserver = new TestObserver<T>(Subscribers.<T>empty());
44+
this.testObserver = new TestObserver<T>(Subscribers.<T> empty());
4145
}
4246

4347
@Override
4448
public void onCompleted() {
4549
testObserver.onCompleted();
50+
latch.countDown();
4651
}
4752

4853
public List<Notification<T>> getOnCompletedEvents() {
@@ -52,6 +57,7 @@ public List<Notification<T>> getOnCompletedEvents() {
5257
@Override
5358
public void onError(Throwable e) {
5459
testObserver.onError(e);
60+
latch.countDown();
5561
}
5662

5763
public List<Throwable> getOnErrorEvents() {
@@ -60,6 +66,7 @@ public List<Throwable> getOnErrorEvents() {
6066

6167
@Override
6268
public void onNext(T t) {
69+
lastSeenThread = Thread.currentThread();
6370
testObserver.onNext(t);
6471
}
6572

@@ -78,4 +85,23 @@ public void assertTerminalEvent() {
7885
testObserver.assertTerminalEvent();
7986
}
8087

88+
public void awaitTerminalEvent() {
89+
try {
90+
latch.await();
91+
} catch (InterruptedException e) {
92+
throw new RuntimeException("Interrupted", e);
93+
}
94+
}
95+
96+
public void awaitTerminalEvent(long timeout, TimeUnit unit) {
97+
try {
98+
latch.await(timeout, unit);
99+
} catch (InterruptedException e) {
100+
throw new RuntimeException("Interrupted", e);
101+
}
102+
}
103+
104+
public Thread getLastSeenThread() {
105+
return lastSeenThread;
106+
}
81107
}

rxjava-core/src/main/java/rx/operators/OperationFilter.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)