Skip to content

Commit 2d13986

Browse files
committed
1.x: fix GroupBy delaying group completion till all groups were emitted
1 parent 686231b commit 2d13986

File tree

2 files changed

+31
-7
lines changed

2 files changed

+31
-7
lines changed

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ public void onCompleted() {
219219
if (done) {
220220
return;
221221
}
222+
List<GroupedUnicast<K, V>> list = new ArrayList<GroupedUnicast<K, V>>(groups.values());
223+
groups.clear();
224+
225+
for (GroupedUnicast<K, V> e : list) {
226+
e.onComplete();
227+
}
228+
222229
done = true;
223230
GROUP_COUNT.decrementAndGet(this);
224231
drain();
@@ -328,13 +335,6 @@ boolean checkTerminated(boolean d, boolean empty,
328335
return true;
329336
} else
330337
if (empty) {
331-
List<GroupedUnicast<K, V>> list = new ArrayList<GroupedUnicast<K, V>>(groups.values());
332-
groups.clear();
333-
334-
for (GroupedUnicast<K, V> e : list) {
335-
e.onComplete();
336-
}
337-
338338
actual.onCompleted();
339339
return true;
340340
}

src/test/java/rx/GroupByTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.functions.Action1;
2222
import rx.functions.Func1;
2323
import rx.observables.GroupedObservable;
24+
import rx.observers.TestSubscriber;
2425

2526
public class GroupByTests {
2627

@@ -90,4 +91,27 @@ public void call(String v) {
9091

9192
System.out.println("**** finished");
9293
}
94+
95+
@Test
96+
public void groupsCompleteAsSoonAsMainCompletes() {
97+
TestSubscriber<Integer> ts = TestSubscriber.create();
98+
99+
Observable.range(0, 20)
100+
.groupBy(new Func1<Integer, Integer>() {
101+
@Override
102+
public Integer call(Integer i) {
103+
return i % 5;
104+
}
105+
})
106+
.concatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
107+
@Override
108+
public Observable<Integer> call(GroupedObservable<Integer, Integer> v) {
109+
return v;
110+
}
111+
}).subscribe(ts);
112+
113+
ts.assertValues(0, 5, 10, 15, 1, 6, 11, 16, 2, 7, 12, 17, 3, 8, 13, 18, 4, 9, 14, 19);
114+
ts.assertCompleted();
115+
ts.assertNoErrors();
116+
}
93117
}

0 commit comments

Comments
 (0)