Skip to content

Commit fd63c49

Browse files
authored
2.x: Fix boundary fusion of concatMap and publish operator (#6145)
1 parent 1ad606b commit fd63c49

File tree

4 files changed

+164
-9
lines changed

4 files changed

+164
-9
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public final void onSubscribe(Subscription s) {
113113

114114
if (s instanceof QueueSubscription) {
115115
@SuppressWarnings("unchecked") QueueSubscription<T> f = (QueueSubscription<T>)s;
116-
int m = f.requestFusion(QueueSubscription.ANY);
116+
int m = f.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
117117
if (m == QueueSubscription.SYNC) {
118118
sourceMode = m;
119119
queue = f;

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ static final class PublishSubscriber<T>
155155
*/
156156
final AtomicBoolean shouldConnect;
157157

158-
final AtomicReference<Subscription> s = new AtomicReference<Subscription>();
158+
final AtomicReference<Subscription> upstream = new AtomicReference<Subscription>();
159159

160160
/** Contains either an onComplete or an onError token from upstream. */
161161
volatile Object terminalEvent;
@@ -180,7 +180,7 @@ public void dispose() {
180180
InnerSubscriber[] ps = subscribers.getAndSet(TERMINATED);
181181
if (ps != TERMINATED) {
182182
current.compareAndSet(PublishSubscriber.this, null);
183-
SubscriptionHelper.cancel(s);
183+
SubscriptionHelper.cancel(upstream);
184184
}
185185
}
186186
}
@@ -192,12 +192,12 @@ public boolean isDisposed() {
192192

193193
@Override
194194
public void onSubscribe(Subscription s) {
195-
if (SubscriptionHelper.setOnce(this.s, s)) {
195+
if (SubscriptionHelper.setOnce(this.upstream, s)) {
196196
if (s instanceof QueueSubscription) {
197197
@SuppressWarnings("unchecked")
198198
QueueSubscription<T> qs = (QueueSubscription<T>) s;
199199

200-
int m = qs.requestFusion(QueueSubscription.ANY);
200+
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
201201
if (m == QueueSubscription.SYNC) {
202202
sourceMode = m;
203203
queue = qs;
@@ -482,7 +482,7 @@ void dispatch() {
482482
v = q.poll();
483483
} catch (Throwable ex) {
484484
Exceptions.throwIfFatal(ex);
485-
s.get().cancel();
485+
upstream.get().cancel();
486486
term = NotificationLite.error(ex);
487487
terminalEvent = term;
488488
v = null;
@@ -493,7 +493,7 @@ void dispatch() {
493493
}
494494
// otherwise, just ask for a new value
495495
if (sourceMode != QueueSubscription.SYNC) {
496-
s.get().request(1);
496+
upstream.get().request(1);
497497
}
498498
// and retry emitting to potential new child subscribers
499499
continue;
@@ -510,7 +510,7 @@ void dispatch() {
510510
v = q.poll();
511511
} catch (Throwable ex) {
512512
Exceptions.throwIfFatal(ex);
513-
s.get().cancel();
513+
upstream.get().cancel();
514514
term = NotificationLite.error(ex);
515515
terminalEvent = term;
516516
v = null;
@@ -562,7 +562,7 @@ void dispatch() {
562562
// if we did emit at least one element, request more to replenish the queue
563563
if (d > 0) {
564564
if (sourceMode != QueueSubscription.SYNC) {
565-
s.get().request(d);
565+
upstream.get().request(d);
566566
}
567567
}
568568
// if we have requests but not an empty queue after emission

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,16 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import java.util.concurrent.TimeUnit;
17+
1618
import org.junit.Test;
19+
import org.reactivestreams.Publisher;
1720

21+
import io.reactivex.*;
22+
import io.reactivex.exceptions.TestException;
23+
import io.reactivex.functions.Function;
1824
import io.reactivex.internal.operators.flowable.FlowableConcatMap.WeakScalarSubscription;
25+
import io.reactivex.schedulers.Schedulers;
1926
import io.reactivex.subscribers.TestSubscriber;
2027

2128
public class FlowableConcatMapTest {
@@ -39,4 +46,101 @@ public void weakSubscriptionRequest() {
3946
ts.assertResult(1);
4047
}
4148

49+
@Test
50+
public void boundaryFusion() {
51+
Flowable.range(1, 10000)
52+
.observeOn(Schedulers.single())
53+
.map(new Function<Integer, String>() {
54+
@Override
55+
public String apply(Integer t) throws Exception {
56+
String name = Thread.currentThread().getName();
57+
if (name.contains("RxSingleScheduler")) {
58+
return "RxSingleScheduler";
59+
}
60+
return name;
61+
}
62+
})
63+
.concatMap(new Function<String, Publisher<? extends Object>>() {
64+
@Override
65+
public Publisher<? extends Object> apply(String v)
66+
throws Exception {
67+
return Flowable.just(v);
68+
}
69+
})
70+
.observeOn(Schedulers.computation())
71+
.distinct()
72+
.test()
73+
.awaitDone(5, TimeUnit.SECONDS)
74+
.assertResult("RxSingleScheduler");
75+
}
76+
77+
@Test
78+
public void boundaryFusionDelayError() {
79+
Flowable.range(1, 10000)
80+
.observeOn(Schedulers.single())
81+
.map(new Function<Integer, String>() {
82+
@Override
83+
public String apply(Integer t) throws Exception {
84+
String name = Thread.currentThread().getName();
85+
if (name.contains("RxSingleScheduler")) {
86+
return "RxSingleScheduler";
87+
}
88+
return name;
89+
}
90+
})
91+
.concatMapDelayError(new Function<String, Publisher<? extends Object>>() {
92+
@Override
93+
public Publisher<? extends Object> apply(String v)
94+
throws Exception {
95+
return Flowable.just(v);
96+
}
97+
})
98+
.observeOn(Schedulers.computation())
99+
.distinct()
100+
.test()
101+
.awaitDone(5, TimeUnit.SECONDS)
102+
.assertResult("RxSingleScheduler");
103+
}
104+
105+
@Test
106+
public void pollThrows() {
107+
Flowable.just(1)
108+
.map(new Function<Integer, Integer>() {
109+
@Override
110+
public Integer apply(Integer v) throws Exception {
111+
throw new TestException();
112+
}
113+
})
114+
.compose(TestHelper.<Integer>flowableStripBoundary())
115+
.concatMap(new Function<Integer, Publisher<Integer>>() {
116+
@Override
117+
public Publisher<Integer> apply(Integer v)
118+
throws Exception {
119+
return Flowable.just(v);
120+
}
121+
})
122+
.test()
123+
.assertFailure(TestException.class);
124+
}
125+
126+
@Test
127+
public void pollThrowsDelayError() {
128+
Flowable.just(1)
129+
.map(new Function<Integer, Integer>() {
130+
@Override
131+
public Integer apply(Integer v) throws Exception {
132+
throw new TestException();
133+
}
134+
})
135+
.compose(TestHelper.<Integer>flowableStripBoundary())
136+
.concatMapDelayError(new Function<Integer, Publisher<Integer>>() {
137+
@Override
138+
public Publisher<Integer> apply(Integer v)
139+
throws Exception {
140+
return Flowable.just(v);
141+
}
142+
})
143+
.test()
144+
.assertFailure(TestException.class);
145+
}
42146
}

src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,12 +824,36 @@ public Object apply(Integer v) throws Exception {
824824
throw new TestException();
825825
}
826826
})
827+
.compose(TestHelper.flowableStripBoundary())
827828
.publish()
828829
.autoConnect()
829830
.test()
830831
.assertFailure(TestException.class);
831832
}
832833

834+
@Test
835+
public void pollThrowsNoSubscribers() {
836+
ConnectableFlowable<Integer> cf = Flowable.just(1, 2)
837+
.map(new Function<Integer, Integer>() {
838+
@Override
839+
public Integer apply(Integer v) throws Exception {
840+
if (v == 2) {
841+
throw new TestException();
842+
}
843+
return v;
844+
}
845+
})
846+
.compose(TestHelper.<Integer>flowableStripBoundary())
847+
.publish();
848+
849+
TestSubscriber<Integer> ts = cf.take(1)
850+
.test();
851+
852+
cf.connect();
853+
854+
ts.assertResult(1);
855+
}
856+
833857
@Test
834858
public void dryRunCrash() {
835859
List<Throwable> errors = TestHelper.trackPluginErrors();
@@ -1316,4 +1340,31 @@ public void onComplete() {
13161340
ts1.assertEmpty();
13171341
ts2.assertValuesOnly(1);
13181342
}
1343+
1344+
@Test
1345+
public void boundaryFusion() {
1346+
Flowable.range(1, 10000)
1347+
.observeOn(Schedulers.single())
1348+
.map(new Function<Integer, String>() {
1349+
@Override
1350+
public String apply(Integer t) throws Exception {
1351+
String name = Thread.currentThread().getName();
1352+
if (name.contains("RxSingleScheduler")) {
1353+
return "RxSingleScheduler";
1354+
}
1355+
return name;
1356+
}
1357+
})
1358+
.share()
1359+
.observeOn(Schedulers.computation())
1360+
.distinct()
1361+
.test()
1362+
.awaitDone(5, TimeUnit.SECONDS)
1363+
.assertResult("RxSingleScheduler");
1364+
}
1365+
1366+
@Test
1367+
public void badRequest() {
1368+
TestHelper.assertBadRequestReported(Flowable.range(1, 5).publish());
1369+
}
13191370
}

0 commit comments

Comments
 (0)