diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index 38edc0a68f..4fe29b6c2d 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -219,6 +219,12 @@ public void onCompleted() { if (done) { return; } + + for (GroupedUnicast e : groups.values()) { + e.onComplete(); + } + groups.clear(); + done = true; GROUP_COUNT.decrementAndGet(this); drain(); @@ -328,13 +334,6 @@ boolean checkTerminated(boolean d, boolean empty, return true; } else if (empty) { - List> list = new ArrayList>(groups.values()); - groups.clear(); - - for (GroupedUnicast e : list) { - e.onComplete(); - } - actual.onCompleted(); return true; } diff --git a/src/test/java/rx/GroupByTests.java b/src/test/java/rx/GroupByTests.java index 3530c08799..a4527777ef 100644 --- a/src/test/java/rx/GroupByTests.java +++ b/src/test/java/rx/GroupByTests.java @@ -21,6 +21,7 @@ import rx.functions.Action1; import rx.functions.Func1; import rx.observables.GroupedObservable; +import rx.observers.TestSubscriber; public class GroupByTests { @@ -90,4 +91,27 @@ public void call(String v) { System.out.println("**** finished"); } + + @Test + public void groupsCompleteAsSoonAsMainCompletes() { + TestSubscriber ts = TestSubscriber.create(); + + Observable.range(0, 20) + .groupBy(new Func1() { + @Override + public Integer call(Integer i) { + return i % 5; + } + }) + .concatMap(new Func1, Observable>() { + @Override + public Observable call(GroupedObservable v) { + return v; + } + }).subscribe(ts); + + ts.assertValues(0, 5, 10, 15, 1, 6, 11, 16, 2, 7, 12, 17, 3, 8, 13, 18, 4, 9, 14, 19); + ts.assertCompleted(); + ts.assertNoErrors(); + } }