Skip to content

Commit 06f801c

Browse files
authored
2.x: fix Obs.combineLatest to dispose eagerly (#5114)
1 parent 0518d58 commit 06f801c

File tree

3 files changed

+73
-2
lines changed

3 files changed

+73
-2
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,9 @@ public void subscribe(ObservableSource<? extends T>[] sources) {
124124
public void dispose() {
125125
if (!cancelled) {
126126
cancelled = true;
127-
127+
cancelSources();
128128
if (getAndIncrement() == 0) {
129-
cancel(queue);
129+
clear(queue);
130130
}
131131
}
132132
}
@@ -138,6 +138,10 @@ public boolean isDisposed() {
138138

139139
void cancel(SpscLinkedArrayQueue<?> q) {
140140
clear(q);
141+
cancelSources();
142+
}
143+
144+
void cancelSources() {
141145
for (CombinerObserver<T, R> s : observers) {
142146
s.dispose();
143147
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,4 +1517,38 @@ public void run() throws Exception {
15171517
RxJavaPlugins.reset();
15181518
}
15191519
}
1520+
1521+
@Test
1522+
public void eagerDispose() {
1523+
final PublishProcessor<Integer> pp1 = PublishProcessor.create();
1524+
final PublishProcessor<Integer> pp2 = PublishProcessor.create();
1525+
1526+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
1527+
@Override
1528+
public void onNext(Integer t) {
1529+
super.onNext(t);
1530+
cancel();
1531+
if (pp1.hasSubscribers()) {
1532+
onError(new IllegalStateException("pp1 not disposed"));
1533+
} else
1534+
if (pp2.hasSubscribers()) {
1535+
onError(new IllegalStateException("pp2 not disposed"));
1536+
} else {
1537+
onComplete();
1538+
}
1539+
}
1540+
};
1541+
1542+
Flowable.combineLatest(pp1, pp2, new BiFunction<Integer, Integer, Integer>() {
1543+
@Override
1544+
public Integer apply(Integer t1, Integer t2) throws Exception {
1545+
return t1 + t2;
1546+
}
1547+
})
1548+
.subscribe(ts);
1549+
1550+
pp1.onNext(1);
1551+
pp2.onNext(2);
1552+
ts.assertResult(3);
1553+
}
15201554
}

src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,4 +1165,37 @@ public void run() throws Exception {
11651165
}
11661166
}
11671167

1168+
@Test
1169+
public void eagerDispose() {
1170+
final PublishSubject<Integer> ps1 = PublishSubject.create();
1171+
final PublishSubject<Integer> ps2 = PublishSubject.create();
1172+
1173+
TestObserver<Integer> ts = new TestObserver<Integer>() {
1174+
@Override
1175+
public void onNext(Integer t) {
1176+
super.onNext(t);
1177+
cancel();
1178+
if (ps1.hasObservers()) {
1179+
onError(new IllegalStateException("ps1 not disposed"));
1180+
} else
1181+
if (ps2.hasObservers()) {
1182+
onError(new IllegalStateException("ps2 not disposed"));
1183+
} else {
1184+
onComplete();
1185+
}
1186+
}
1187+
};
1188+
1189+
Observable.combineLatest(ps1, ps2, new BiFunction<Integer, Integer, Integer>() {
1190+
@Override
1191+
public Integer apply(Integer t1, Integer t2) throws Exception {
1192+
return t1 + t2;
1193+
}
1194+
})
1195+
.subscribe(ts);
1196+
1197+
ps1.onNext(1);
1198+
ps2.onNext(2);
1199+
ts.assertResult(3);
1200+
}
11681201
}

0 commit comments

Comments
 (0)