Skip to content

Commit edb1f71

Browse files
authored
3.x: Fix concatMapDelayError not continuing on fused inner source crash (#6523)
1 parent 92efd60 commit edb1f71

File tree

2 files changed

+46
-5
lines changed

2 files changed

+46
-5
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -519,10 +519,13 @@ void drain() {
519519
vr = supplier.get();
520520
} catch (Throwable e) {
521521
Exceptions.throwIfFatal(e);
522-
upstream.cancel();
523522
errors.addThrowable(e);
524-
downstream.onError(errors.terminate());
525-
return;
523+
if (!veryEnd) {
524+
upstream.cancel();
525+
downstream.onError(errors.terminate());
526+
return;
527+
}
528+
vr = null;
526529
}
527530

528531
if (vr == null) {

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java

+40-2
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515

1616
import static org.junit.Assert.assertEquals;
1717

18-
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.*;
1919
import java.util.concurrent.atomic.AtomicInteger;
2020

2121
import org.junit.Test;
2222
import org.reactivestreams.Publisher;
2323

2424
import io.reactivex.*;
25-
import io.reactivex.exceptions.TestException;
25+
import io.reactivex.exceptions.*;
2626
import io.reactivex.functions.*;
2727
import io.reactivex.internal.operators.flowable.FlowableConcatMap.WeakScalarSubscription;
2828
import io.reactivex.schedulers.Schedulers;
@@ -169,4 +169,42 @@ public void run() throws Exception {
169169

170170
assertEquals(0, counter.get());
171171
}
172+
173+
@Test
174+
public void delayErrorCallableTillTheEnd() {
175+
Flowable.just(1, 2, 3, 101, 102, 23, 890, 120, 32)
176+
.concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
177+
@Override public Flowable<Integer> apply(final Integer integer) throws Exception {
178+
return Flowable.fromCallable(new Callable<Integer>() {
179+
@Override public Integer call() throws Exception {
180+
if (integer >= 100) {
181+
throw new NullPointerException("test null exp");
182+
}
183+
return integer;
184+
}
185+
});
186+
}
187+
})
188+
.test()
189+
.assertFailure(CompositeException.class, 1, 2, 3, 23, 32);
190+
}
191+
192+
@Test
193+
public void delayErrorCallableEager() {
194+
Flowable.just(1, 2, 3, 101, 102, 23, 890, 120, 32)
195+
.concatMapDelayError(new Function<Integer, Flowable<Integer>>() {
196+
@Override public Flowable<Integer> apply(final Integer integer) throws Exception {
197+
return Flowable.fromCallable(new Callable<Integer>() {
198+
@Override public Integer call() throws Exception {
199+
if (integer >= 100) {
200+
throw new NullPointerException("test null exp");
201+
}
202+
return integer;
203+
}
204+
});
205+
}
206+
}, 2, false)
207+
.test()
208+
.assertFailure(NullPointerException.class, 1, 2, 3);
209+
}
172210
}

0 commit comments

Comments
 (0)