Skip to content

Commit cb7ff83

Browse files
authored
3.x: Fix groupBy group emission in some cases (#6664)
* 3.x: Fix Observable.groupBy group emission in some cases * Test non-fresh group with value selector failure
1 parent c2dc134 commit cb7ff83

File tree

4 files changed

+132
-6
lines changed

4 files changed

+132
-6
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ public void onNext(T t) {
168168
} catch (Throwable ex) {
169169
Exceptions.throwIfFatal(ex);
170170
upstream.cancel();
171+
if (newGroup) {
172+
q.offer(group);
173+
drain();
174+
}
171175
onError(ex);
172176
return;
173177
}

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public void onNext(T t) {
9797

9898
Object mapKey = key != null ? key : NULL_KEY;
9999
GroupedUnicast<K, V> group = groups.get(mapKey);
100+
boolean newGroup = false;
100101
if (group == null) {
101102
// if the main has been cancelled, stop creating groups
102103
// and skip this value
@@ -109,12 +110,7 @@ public void onNext(T t) {
109110

110111
getAndIncrement();
111112

112-
downstream.onNext(group);
113-
114-
if (group.state.tryAbandon()) {
115-
cancel(key);
116-
group.onComplete();
117-
}
113+
newGroup = true;
118114
}
119115

120116
V v;
@@ -123,11 +119,23 @@ public void onNext(T t) {
123119
} catch (Throwable e) {
124120
Exceptions.throwIfFatal(e);
125121
upstream.dispose();
122+
if (newGroup) {
123+
downstream.onNext(group);
124+
}
126125
onError(e);
127126
return;
128127
}
129128

130129
group.onNext(v);
130+
131+
if (newGroup) {
132+
downstream.onNext(group);
133+
134+
if (group.state.tryAbandon()) {
135+
cancel(key);
136+
group.onComplete();
137+
}
138+
}
131139
}
132140

133141
@Override

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2275,4 +2275,61 @@ public void accept(GroupedFlowable<Integer, Integer> v) throws Throwable {
22752275
.assertNoErrors()
22762276
.assertComplete();
22772277
}
2278+
2279+
@Test
2280+
public void newGroupValueSelectorFails() {
2281+
TestSubscriber<Object> ts1 = new TestSubscriber<Object>();
2282+
final TestSubscriber<Object> ts2 = new TestSubscriber<Object>();
2283+
2284+
Flowable.just(1)
2285+
.groupBy(Functions.<Integer>identity(), new Function<Integer, Object>() {
2286+
@Override
2287+
public Object apply(Integer v) throws Throwable {
2288+
throw new TestException();
2289+
}
2290+
})
2291+
.doOnNext(new Consumer<GroupedFlowable<Integer, Object>>() {
2292+
@Override
2293+
public void accept(GroupedFlowable<Integer, Object> g) throws Throwable {
2294+
g.subscribe(ts2);
2295+
}
2296+
})
2297+
.subscribe(ts1);
2298+
2299+
ts1.assertValueCount(1)
2300+
.assertError(TestException.class)
2301+
.assertNotComplete();
2302+
2303+
ts2.assertFailure(TestException.class);
2304+
}
2305+
2306+
@Test
2307+
public void existingGroupValueSelectorFails() {
2308+
TestSubscriber<Object> ts1 = new TestSubscriber<Object>();
2309+
final TestSubscriber<Object> ts2 = new TestSubscriber<Object>();
2310+
2311+
Flowable.just(1, 2)
2312+
.groupBy(Functions.justFunction(1), new Function<Integer, Object>() {
2313+
@Override
2314+
public Object apply(Integer v) throws Throwable {
2315+
if (v == 2) {
2316+
throw new TestException();
2317+
}
2318+
return v;
2319+
}
2320+
})
2321+
.doOnNext(new Consumer<GroupedFlowable<Integer, Object>>() {
2322+
@Override
2323+
public void accept(GroupedFlowable<Integer, Object> g) throws Throwable {
2324+
g.subscribe(ts2);
2325+
}
2326+
})
2327+
.subscribe(ts1);
2328+
2329+
ts1.assertValueCount(1)
2330+
.assertError(TestException.class)
2331+
.assertNotComplete();
2332+
2333+
ts2.assertFailure(TestException.class, 1);
2334+
}
22782335
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,4 +1615,61 @@ public void accept(GroupedObservable<Integer, Integer> v) throws Throwable {
16151615
.assertNoErrors()
16161616
.assertComplete();
16171617
}
1618+
1619+
@Test
1620+
public void newGroupValueSelectorFails() {
1621+
TestObserver<Object> to1 = new TestObserver<Object>();
1622+
final TestObserver<Object> to2 = new TestObserver<Object>();
1623+
1624+
Observable.just(1)
1625+
.groupBy(Functions.<Integer>identity(), new Function<Integer, Object>() {
1626+
@Override
1627+
public Object apply(Integer v) throws Throwable {
1628+
throw new TestException();
1629+
}
1630+
})
1631+
.doOnNext(new Consumer<GroupedObservable<Integer, Object>>() {
1632+
@Override
1633+
public void accept(GroupedObservable<Integer, Object> g) throws Throwable {
1634+
g.subscribe(to2);
1635+
}
1636+
})
1637+
.subscribe(to1);
1638+
1639+
to1.assertValueCount(1)
1640+
.assertError(TestException.class)
1641+
.assertNotComplete();
1642+
1643+
to2.assertFailure(TestException.class);
1644+
}
1645+
1646+
@Test
1647+
public void existingGroupValueSelectorFails() {
1648+
TestObserver<Object> to1 = new TestObserver<Object>();
1649+
final TestObserver<Object> to2 = new TestObserver<Object>();
1650+
1651+
Observable.just(1, 2)
1652+
.groupBy(Functions.justFunction(1), new Function<Integer, Object>() {
1653+
@Override
1654+
public Object apply(Integer v) throws Throwable {
1655+
if (v == 2) {
1656+
throw new TestException();
1657+
}
1658+
return v;
1659+
}
1660+
})
1661+
.doOnNext(new Consumer<GroupedObservable<Integer, Object>>() {
1662+
@Override
1663+
public void accept(GroupedObservable<Integer, Object> g) throws Throwable {
1664+
g.subscribe(to2);
1665+
}
1666+
})
1667+
.subscribe(to1);
1668+
1669+
to1.assertValueCount(1)
1670+
.assertError(TestException.class)
1671+
.assertNotComplete();
1672+
1673+
to2.assertFailure(TestException.class, 1);
1674+
}
16181675
}

0 commit comments

Comments
 (0)