Skip to content

Commit c8a2cfa

Browse files
committed
1.x: fix GroupBy delaying group completion till all groups were emitted
1 parent 686231b commit c8a2cfa

File tree

2 files changed

+30
-7
lines changed

2 files changed

+30
-7
lines changed

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ public void onCompleted() {
219219
if (done) {
220220
return;
221221
}
222+
223+
for (GroupedUnicast<K, V> e : groups.values()) {
224+
e.onComplete();
225+
}
226+
groups.clear();
227+
222228
done = true;
223229
GROUP_COUNT.decrementAndGet(this);
224230
drain();
@@ -328,13 +334,6 @@ boolean checkTerminated(boolean d, boolean empty,
328334
return true;
329335
} else
330336
if (empty) {
331-
List<GroupedUnicast<K, V>> list = new ArrayList<GroupedUnicast<K, V>>(groups.values());
332-
groups.clear();
333-
334-
for (GroupedUnicast<K, V> e : list) {
335-
e.onComplete();
336-
}
337-
338337
actual.onCompleted();
339338
return true;
340339
}

src/test/java/rx/GroupByTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.functions.Action1;
2222
import rx.functions.Func1;
2323
import rx.observables.GroupedObservable;
24+
import rx.observers.TestSubscriber;
2425

2526
public class GroupByTests {
2627

@@ -90,4 +91,27 @@ public void call(String v) {
9091

9192
System.out.println("**** finished");
9293
}
94+
95+
@Test
96+
public void groupsCompleteAsSoonAsMainCompletes() {
97+
TestSubscriber<Integer> ts = TestSubscriber.create();
98+
99+
Observable.range(0, 20)
100+
.groupBy(new Func1<Integer, Integer>() {
101+
@Override
102+
public Integer call(Integer i) {
103+
return i % 5;
104+
}
105+
})
106+
.concatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
107+
@Override
108+
public Observable<Integer> call(GroupedObservable<Integer, Integer> v) {
109+
return v;
110+
}
111+
}).subscribe(ts);
112+
113+
ts.assertValues(0, 5, 10, 15, 1, 6, 11, 16, 2, 7, 12, 17, 3, 8, 13, 18, 4, 9, 14, 19);
114+
ts.assertCompleted();
115+
ts.assertNoErrors();
116+
}
93117
}

0 commit comments

Comments
 (0)