Skip to content

Commit 2a4b18e

Browse files
authored
2.x: fix flatMap not cancelling the upstream eagerly (#5133)
1 parent 7494a2c commit 2a4b18e

File tree

4 files changed

+87
-11
lines changed

4 files changed

+87
-11
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -551,19 +551,24 @@ void drainLoop() {
551551

552552
boolean checkTerminate() {
553553
if (cancelled) {
554-
SimpleQueue<U> q = queue;
555-
if (q != null) {
556-
q.clear();
557-
}
554+
clearScalarQueue();
558555
return true;
559556
}
560557
if (!delayErrors && errs.get() != null) {
558+
clearScalarQueue();
561559
actual.onError(errs.terminate());
562560
return true;
563561
}
564562
return false;
565563
}
566564

565+
void clearScalarQueue() {
566+
SimpleQueue<U> q = queue;
567+
if (q != null) {
568+
q.clear();
569+
}
570+
}
571+
567572
void disposeAll() {
568573
InnerSubscriber<?, ?>[] a = subscribers.get();
569574
if (a != CANCELLED) {
@@ -579,6 +584,21 @@ void disposeAll() {
579584
}
580585
}
581586
}
587+
588+
void innerError(InnerSubscriber<T, U> inner, Throwable t) {
589+
if (errs.addThrowable(t)) {
590+
inner.done = true;
591+
if (!delayErrors) {
592+
s.cancel();
593+
for (InnerSubscriber<?, ?> a : subscribers.getAndSet(CANCELLED)) {
594+
a.dispose();
595+
}
596+
}
597+
drain();
598+
} else {
599+
RxJavaPlugins.onError(t);
600+
}
601+
}
582602
}
583603

584604
static final class InnerSubscriber<T, U> extends AtomicReference<Subscription>
@@ -636,12 +656,8 @@ public void onNext(U t) {
636656
}
637657
@Override
638658
public void onError(Throwable t) {
639-
if (parent.errs.addThrowable(t)) {
640-
done = true;
641-
parent.drain();
642-
} else {
643-
RxJavaPlugins.onError(t);
644-
}
659+
lazySet(SubscriptionHelper.CANCELLED);
660+
parent.innerError(this, t);
645661
}
646662
@Override
647663
public void onComplete() {

src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828

2929
import io.reactivex.*;
3030
import io.reactivex.Scheduler.Worker;
31+
import io.reactivex.exceptions.TestException;
3132
import io.reactivex.functions.*;
3233
import io.reactivex.internal.functions.Functions;
3334
import io.reactivex.internal.subscriptions.*;
3435
import io.reactivex.internal.util.*;
36+
import io.reactivex.plugins.RxJavaPlugins;
3537
import io.reactivex.processors.PublishProcessor;
3638
import io.reactivex.schedulers.*;
3739
import io.reactivex.subscribers.*;
@@ -1630,4 +1632,21 @@ public void mergeArray() {
16301632
.test()
16311633
.assertResult(1, 2);
16321634
}
1635+
1636+
@Test
1637+
public void mergeErrors() {
1638+
List<Throwable> errors = TestHelper.trackPluginErrors();
1639+
try {
1640+
Flowable<Integer> source1 = Flowable.error(new TestException("First"));
1641+
Flowable<Integer> source2 = Flowable.error(new TestException("Second"));
1642+
1643+
Flowable.merge(source1, source2)
1644+
.test()
1645+
.assertFailureAndMessage(TestException.class, "First");
1646+
1647+
assertTrue(errors.toString(), errors.isEmpty());
1648+
} finally {
1649+
RxJavaPlugins.reset();
1650+
}
1651+
}
16331652
}

src/test/java/io/reactivex/internal/operators/observable/ObservableMergeTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
import io.reactivex.Observer;
2828
import io.reactivex.Scheduler.Worker;
2929
import io.reactivex.disposables.*;
30+
import io.reactivex.exceptions.TestException;
3031
import io.reactivex.functions.*;
3132
import io.reactivex.observers.*;
33+
import io.reactivex.plugins.RxJavaPlugins;
3234
import io.reactivex.schedulers.*;
3335

3436
public class ObservableMergeTest {
@@ -1125,4 +1127,21 @@ public void mergeArray() {
11251127
.test()
11261128
.assertResult(1, 2);
11271129
}
1130+
1131+
@Test
1132+
public void mergeErrors() {
1133+
List<Throwable> errors = TestHelper.trackPluginErrors();
1134+
try {
1135+
Observable<Integer> source1 = Observable.error(new TestException("First"));
1136+
Observable<Integer> source2 = Observable.error(new TestException("Second"));
1137+
1138+
Observable.merge(source1, source2)
1139+
.test()
1140+
.assertFailureAndMessage(TestException.class, "First");
1141+
1142+
assertTrue(errors.toString(), errors.isEmpty());
1143+
} finally {
1144+
RxJavaPlugins.reset();
1145+
}
1146+
}
11281147
}

src/test/java/io/reactivex/internal/operators/single/SingleMergeTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,15 @@
1313

1414
package io.reactivex.internal.operators.single;
1515

16+
import static org.junit.Assert.assertTrue;
17+
18+
import java.util.List;
19+
1620
import org.junit.Test;
1721

18-
import io.reactivex.Single;
22+
import io.reactivex.*;
23+
import io.reactivex.exceptions.TestException;
24+
import io.reactivex.plugins.RxJavaPlugins;
1925

2026
public class SingleMergeTest {
2127

@@ -48,4 +54,20 @@ public void merge4() {
4854
.assertResult(1, 2, 3, 4);
4955
}
5056

57+
@Test
58+
public void mergeErrors() {
59+
List<Throwable> errors = TestHelper.trackPluginErrors();
60+
try {
61+
Single<Integer> source1 = Single.error(new TestException("First"));
62+
Single<Integer> source2 = Single.error(new TestException("Second"));
63+
64+
Single.merge(source1, source2)
65+
.test()
66+
.assertFailureAndMessage(TestException.class, "First");
67+
68+
assertTrue(errors.toString(), errors.isEmpty());
69+
} finally {
70+
RxJavaPlugins.reset();
71+
}
72+
}
5173
}

0 commit comments

Comments
 (0)