Skip to content

Commit 2f27ea4

Browse files
authored
3.x: Fix concurrent clear() calls when fused chains are canceled (#6676)
1 parent c84af61 commit 2f27ea4

File tree

8 files changed

+205
-14
lines changed

8 files changed

+205
-14
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public void cancel(K key) {
268268
if (groupCount.decrementAndGet() == 0) {
269269
upstream.cancel();
270270

271-
if (getAndIncrement() == 0) {
271+
if (!outputFused && getAndIncrement() == 0) {
272272
queue.clear();
273273
}
274274
}
@@ -601,7 +601,6 @@ void drainFused() {
601601
for (;;) {
602602
if (a != null) {
603603
if (cancelled.get()) {
604-
q.clear();
605604
return;
606605
}
607606

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void cancel() {
150150
cancelled = true;
151151
upstream.cancel();
152152

153-
if (getAndIncrement() == 0) {
153+
if (!outputFused && getAndIncrement() == 0) {
154154
queue.clear();
155155
}
156156
}

src/main/java/io/reactivex/rxjava3/processors/UnicastProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,6 @@ void drainFused(Subscriber<? super T> a) {
345345
for (;;) {
346346

347347
if (cancelled) {
348-
q.clear();
349348
downstream.lazySet(null);
350349
return;
351350
}
@@ -548,10 +547,11 @@ public void cancel() {
548547

549548
doTerminate();
550549

551-
if (!enableOperatorFusion) {
552-
if (wip.getAndIncrement() == 0) {
550+
downstream.lazySet(null);
551+
if (wip.getAndIncrement() == 0) {
552+
downstream.lazySet(null);
553+
if (!enableOperatorFusion) {
553554
queue.clear();
554-
downstream.lazySet(null);
555555
}
556556
}
557557
}

src/main/java/io/reactivex/rxjava3/subjects/UnicastSubject.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,6 @@ void drainFused(Observer<? super T> a) {
418418

419419
if (disposed) {
420420
downstream.lazySet(null);
421-
q.clear();
422421
return;
423422
}
424423
boolean d = done;
@@ -556,7 +555,9 @@ public void dispose() {
556555
downstream.lazySet(null);
557556
if (wip.getAndIncrement() == 0) {
558557
downstream.lazySet(null);
559-
queue.clear();
558+
if (!enableOperatorFusion) {
559+
queue.clear();
560+
}
560561
}
561562
}
562563
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@
3030
import com.google.common.cache.*;
3131

3232
import io.reactivex.rxjava3.core.*;
33-
import io.reactivex.rxjava3.exceptions.TestException;
33+
import io.reactivex.rxjava3.exceptions.*;
3434
import io.reactivex.rxjava3.flowables.GroupedFlowable;
3535
import io.reactivex.rxjava3.functions.*;
3636
import io.reactivex.rxjava3.internal.functions.Functions;
37-
import io.reactivex.rxjava3.internal.fuseable.QueueFuseable;
37+
import io.reactivex.rxjava3.internal.fuseable.*;
3838
import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription;
39+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
3940
import io.reactivex.rxjava3.processors.PublishProcessor;
4041
import io.reactivex.rxjava3.schedulers.Schedulers;
4142
import io.reactivex.rxjava3.subjects.PublishSubject;
@@ -2332,4 +2333,83 @@ public void accept(GroupedFlowable<Integer, Object> g) throws Throwable {
23322333

23332334
ts2.assertFailure(TestException.class, 1);
23342335
}
2336+
2337+
@Test
2338+
public void fusedNoConcurrentCleanDueToCancel() {
2339+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
2340+
List<Throwable> errors = TestHelper.trackPluginErrors();
2341+
try {
2342+
final PublishProcessor<Integer> pp = PublishProcessor.create();
2343+
2344+
final AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>> qs = new AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>>();
2345+
2346+
final TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
2347+
2348+
pp.groupBy(Functions.identity(), Functions.<Integer>identity(), false, 4)
2349+
.subscribe(new FlowableSubscriber<GroupedFlowable<Object, Integer>>() {
2350+
2351+
boolean once;
2352+
2353+
@Override
2354+
public void onNext(GroupedFlowable<Object, Integer> g) {
2355+
if (!once) {
2356+
try {
2357+
GroupedFlowable<Object, Integer> t = qs.get().poll();
2358+
if (t != null) {
2359+
once = true;
2360+
t.subscribe(ts2);
2361+
}
2362+
} catch (Throwable ignored) {
2363+
// not relevant here
2364+
}
2365+
}
2366+
}
2367+
2368+
@Override
2369+
public void onError(Throwable t) {
2370+
}
2371+
2372+
@Override
2373+
public void onComplete() {
2374+
}
2375+
2376+
@Override
2377+
public void onSubscribe(Subscription s) {
2378+
@SuppressWarnings("unchecked")
2379+
QueueSubscription<GroupedFlowable<Object, Integer>> q = (QueueSubscription<GroupedFlowable<Object, Integer>>)s;
2380+
qs.set(q);
2381+
q.requestFusion(QueueFuseable.ANY);
2382+
q.request(1);
2383+
}
2384+
})
2385+
;
2386+
2387+
Runnable r1 = new Runnable() {
2388+
@Override
2389+
public void run() {
2390+
qs.get().cancel();
2391+
qs.get().clear();
2392+
}
2393+
};
2394+
Runnable r2 = new Runnable() {
2395+
@Override
2396+
public void run() {
2397+
ts2.cancel();
2398+
}
2399+
};
2400+
2401+
for (int i = 0; i < 100; i++) {
2402+
pp.onNext(i);
2403+
}
2404+
2405+
TestHelper.race(r1, r2);
2406+
2407+
if (!errors.isEmpty()) {
2408+
throw new CompositeException(errors);
2409+
}
2410+
} finally {
2411+
RxJavaPlugins.reset();
2412+
}
2413+
}
2414+
}
23352415
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static org.junit.Assert.*;
1717

18+
import java.util.List;
1819
import java.util.concurrent.*;
1920
import java.util.concurrent.atomic.AtomicBoolean;
2021

@@ -24,11 +25,15 @@
2425
import io.reactivex.rxjava3.core.*;
2526
import io.reactivex.rxjava3.exceptions.*;
2627
import io.reactivex.rxjava3.functions.*;
28+
import io.reactivex.rxjava3.internal.functions.Functions;
2729
import io.reactivex.rxjava3.internal.fuseable.QueueFuseable;
2830
import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription;
31+
import io.reactivex.rxjava3.observers.TestObserver;
32+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
33+
import io.reactivex.rxjava3.processors.PublishProcessor;
2934
import io.reactivex.rxjava3.schedulers.Schedulers;
3035
import io.reactivex.rxjava3.subscribers.*;
31-
import io.reactivex.rxjava3.testsupport.TestSubscriberEx;
36+
import io.reactivex.rxjava3.testsupport.*;
3237

3338
public class FlowableOnBackpressureBufferTest extends RxJavaTest {
3439

@@ -308,4 +313,37 @@ public void fusionRejected() {
308313
ts.assertFusionMode(QueueFuseable.NONE)
309314
.assertEmpty();
310315
}
316+
317+
@Test
318+
public void fusedNoConcurrentCleanDueToCancel() {
319+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
320+
List<Throwable> errors = TestHelper.trackPluginErrors();
321+
try {
322+
final PublishProcessor<Integer> pp = PublishProcessor.create();
323+
324+
TestObserver<Integer> to = pp.onBackpressureBuffer(4, false, true)
325+
.observeOn(Schedulers.io())
326+
.map(Functions.<Integer>identity())
327+
.observeOn(Schedulers.single())
328+
.firstOrError()
329+
.test();
330+
331+
for (int i = 0; pp.hasSubscribers(); i++) {
332+
pp.onNext(i);
333+
}
334+
335+
to
336+
.awaitDone(5, TimeUnit.SECONDS)
337+
;
338+
339+
if (!errors.isEmpty()) {
340+
throw new CompositeException(errors);
341+
}
342+
343+
to.assertResult(0);
344+
} finally {
345+
RxJavaPlugins.reset();
346+
}
347+
}
348+
}
311349
}

src/test/java/io/reactivex/rxjava3/processors/UnicastProcessorTest.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@
1616
import static org.junit.Assert.*;
1717

1818
import java.util.List;
19+
import java.util.concurrent.TimeUnit;
1920
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import org.junit.Test;
2223

2324
import io.reactivex.rxjava3.core.Observable;
2425
import io.reactivex.rxjava3.disposables.Disposable;
25-
import io.reactivex.rxjava3.exceptions.TestException;
26+
import io.reactivex.rxjava3.exceptions.*;
27+
import io.reactivex.rxjava3.internal.functions.Functions;
2628
import io.reactivex.rxjava3.internal.fuseable.QueueFuseable;
2729
import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription;
30+
import io.reactivex.rxjava3.observers.TestObserver;
2831
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
32+
import io.reactivex.rxjava3.schedulers.Schedulers;
2933
import io.reactivex.rxjava3.subscribers.TestSubscriber;
3034
import io.reactivex.rxjava3.testsupport.*;
3135

@@ -439,4 +443,37 @@ public void unicastSubscriptionBadRequest() {
439443
RxJavaPlugins.reset();
440444
}
441445
}
446+
447+
@Test
448+
public void fusedNoConcurrentCleanDueToCancel() {
449+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
450+
List<Throwable> errors = TestHelper.trackPluginErrors();
451+
try {
452+
final UnicastProcessor<Integer> us = UnicastProcessor.create();
453+
454+
TestObserver<Integer> to = us
455+
.observeOn(Schedulers.io())
456+
.map(Functions.<Integer>identity())
457+
.observeOn(Schedulers.single())
458+
.firstOrError()
459+
.test();
460+
461+
for (int i = 0; us.hasSubscribers(); i++) {
462+
us.onNext(i);
463+
}
464+
465+
to
466+
.awaitDone(5, TimeUnit.SECONDS)
467+
;
468+
469+
if (!errors.isEmpty()) {
470+
throw new CompositeException(errors);
471+
}
472+
473+
to.assertResult(0);
474+
} finally {
475+
RxJavaPlugins.reset();
476+
}
477+
}
478+
}
442479
}

src/test/java/io/reactivex/rxjava3/subjects/UnicastSubjectTest.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@
1717
import static org.mockito.Mockito.mock;
1818

1919
import java.util.List;
20+
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
import org.junit.Test;
2324

2425
import io.reactivex.rxjava3.core.Observable;
2526
import io.reactivex.rxjava3.disposables.*;
26-
import io.reactivex.rxjava3.exceptions.TestException;
27+
import io.reactivex.rxjava3.exceptions.*;
28+
import io.reactivex.rxjava3.internal.functions.Functions;
2729
import io.reactivex.rxjava3.internal.fuseable.QueueFuseable;
2830
import io.reactivex.rxjava3.observers.TestObserver;
2931
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
32+
import io.reactivex.rxjava3.schedulers.Schedulers;
3033
import io.reactivex.rxjava3.testsupport.*;
3134

3235
public class UnicastSubjectTest extends SubjectTest<Integer> {
@@ -457,4 +460,37 @@ public void drainFusedFailFastEmpty() {
457460

458461
to.assertEmpty();
459462
}
463+
464+
@Test
465+
public void fusedNoConcurrentCleanDueToCancel() {
466+
for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) {
467+
List<Throwable> errors = TestHelper.trackPluginErrors();
468+
try {
469+
final UnicastSubject<Integer> us = UnicastSubject.create();
470+
471+
TestObserver<Integer> to = us
472+
.observeOn(Schedulers.io())
473+
.map(Functions.<Integer>identity())
474+
.observeOn(Schedulers.single())
475+
.firstOrError()
476+
.test();
477+
478+
for (int i = 0; us.hasObservers(); i++) {
479+
us.onNext(i);
480+
}
481+
482+
to
483+
.awaitDone(5, TimeUnit.SECONDS)
484+
;
485+
486+
if (!errors.isEmpty()) {
487+
throw new CompositeException(errors);
488+
}
489+
490+
to.assertResult(0);
491+
} finally {
492+
RxJavaPlugins.reset();
493+
}
494+
}
495+
}
460496
}

0 commit comments

Comments
 (0)