Skip to content

Commit 9549ed5

Browse files
committed
2.x: make parallel() a fusion-async-boundary
1 parent 1ad6647 commit 9549ed5

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void onSubscribe(Subscription s) {
116116
@SuppressWarnings("unchecked")
117117
QueueSubscription<T> qs = (QueueSubscription<T>) s;
118118

119-
int m = qs.requestFusion(QueueSubscription.ANY);
119+
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
120120

121121
if (m == QueueSubscription.SYNC) {
122122
sourceMode = m;

src/test/java/io/reactivex/parallel/ParallelFlowableTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,4 +1356,45 @@ public void flatMapSubscriberCount() {
13561356
public void fromArraySubscriberCount() {
13571357
ParallelFlowableTest.checkSubscriberCount(ParallelFlowable.fromArray(new Publisher[] { Flowable.just(1) }));
13581358
}
1359+
1360+
@Test
1361+
public void boundaryConfinement() {
1362+
final Set<String> between = new HashSet<String>();
1363+
final ConcurrentHashMap<String, String> processing = new ConcurrentHashMap<String, String>();
1364+
1365+
Flowable.range(1, 10)
1366+
.observeOn(Schedulers.single(), false, 1)
1367+
.doOnNext(new Consumer<Integer>() {
1368+
@Override
1369+
public void accept(Integer v) throws Exception {
1370+
between.add(Thread.currentThread().getName());
1371+
}
1372+
})
1373+
.parallel(2, 1)
1374+
.runOn(Schedulers.computation(), 1)
1375+
.map(new Function<Integer, Object>() {
1376+
@Override
1377+
public Object apply(Integer v) throws Exception {
1378+
processing.putIfAbsent(Thread.currentThread().getName(), "");
1379+
return v;
1380+
}
1381+
})
1382+
.sequential()
1383+
.test()
1384+
.awaitDone(5, TimeUnit.SECONDS)
1385+
.assertSubscribed()
1386+
.assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
1387+
.assertComplete()
1388+
.assertNoErrors()
1389+
;
1390+
1391+
assertEquals(between.toString(), 1, between.size());
1392+
assertTrue(between.toString(), between.iterator().next().contains("RxSingleScheduler"));
1393+
1394+
Map<String, String> map = processing; // AnimalSniffer: CHM.keySet() in Java 8 returns KeySetView
1395+
1396+
for (String e : map.keySet()) {
1397+
assertTrue(map.toString(), e.contains("RxComputationThreadPool"));
1398+
}
1399+
}
13591400
}

0 commit comments

Comments
 (0)