diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java index 0711c74ff4..5bda5c7530 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java @@ -168,6 +168,10 @@ public void onNext(T t) { } catch (Throwable ex) { Exceptions.throwIfFatal(ex); upstream.cancel(); + if (newGroup) { + q.offer(group); + drain(); + } onError(ex); return; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java index 5ac733eb8d..821d2e81e1 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java @@ -97,6 +97,7 @@ public void onNext(T t) { Object mapKey = key != null ? key : NULL_KEY; GroupedUnicast group = groups.get(mapKey); + boolean newGroup = false; if (group == null) { // if the main has been cancelled, stop creating groups // and skip this value @@ -109,12 +110,7 @@ public void onNext(T t) { getAndIncrement(); - downstream.onNext(group); - - if (group.state.tryAbandon()) { - cancel(key); - group.onComplete(); - } + newGroup = true; } V v; @@ -123,11 +119,23 @@ public void onNext(T t) { } catch (Throwable e) { Exceptions.throwIfFatal(e); upstream.dispose(); + if (newGroup) { + downstream.onNext(group); + } onError(e); return; } group.onNext(v); + + if (newGroup) { + downstream.onNext(group); + + if (group.state.tryAbandon()) { + cancel(key); + group.onComplete(); + } + } } @Override diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java index 7feaab715c..be36af652e 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java @@ -2275,4 +2275,61 @@ public void accept(GroupedFlowable v) throws Throwable { .assertNoErrors() .assertComplete(); } + + @Test + public void newGroupValueSelectorFails() { + TestSubscriber ts1 = new TestSubscriber(); + final TestSubscriber ts2 = new TestSubscriber(); + + Flowable.just(1) + .groupBy(Functions.identity(), new Function() { + @Override + public Object apply(Integer v) throws Throwable { + throw new TestException(); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(GroupedFlowable g) throws Throwable { + g.subscribe(ts2); + } + }) + .subscribe(ts1); + + ts1.assertValueCount(1) + .assertError(TestException.class) + .assertNotComplete(); + + ts2.assertFailure(TestException.class); + } + + @Test + public void existingGroupValueSelectorFails() { + TestSubscriber ts1 = new TestSubscriber(); + final TestSubscriber ts2 = new TestSubscriber(); + + Flowable.just(1, 2) + .groupBy(Functions.justFunction(1), new Function() { + @Override + public Object apply(Integer v) throws Throwable { + if (v == 2) { + throw new TestException(); + } + return v; + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(GroupedFlowable g) throws Throwable { + g.subscribe(ts2); + } + }) + .subscribe(ts1); + + ts1.assertValueCount(1) + .assertError(TestException.class) + .assertNotComplete(); + + ts2.assertFailure(TestException.class, 1); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java index 99698d18c2..14ff56a665 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java @@ -1615,4 +1615,61 @@ public void accept(GroupedObservable v) throws Throwable { .assertNoErrors() .assertComplete(); } + + @Test + public void newGroupValueSelectorFails() { + TestObserver to1 = new TestObserver(); + final TestObserver to2 = new TestObserver(); + + Observable.just(1) + .groupBy(Functions.identity(), new Function() { + @Override + public Object apply(Integer v) throws Throwable { + throw new TestException(); + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(GroupedObservable g) throws Throwable { + g.subscribe(to2); + } + }) + .subscribe(to1); + + to1.assertValueCount(1) + .assertError(TestException.class) + .assertNotComplete(); + + to2.assertFailure(TestException.class); + } + + @Test + public void existingGroupValueSelectorFails() { + TestObserver to1 = new TestObserver(); + final TestObserver to2 = new TestObserver(); + + Observable.just(1, 2) + .groupBy(Functions.justFunction(1), new Function() { + @Override + public Object apply(Integer v) throws Throwable { + if (v == 2) { + throw new TestException(); + } + return v; + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(GroupedObservable g) throws Throwable { + g.subscribe(to2); + } + }) + .subscribe(to1); + + to1.assertValueCount(1) + .assertError(TestException.class) + .assertNotComplete(); + + to2.assertFailure(TestException.class, 1); + } }