Skip to content

Commit 678c62f

Browse files
committed
2.x: fix PublishProcessor cancel/emission overflow bug
1 parent cd6bc08 commit 678c62f

File tree

3 files changed

+67
-3
lines changed

3 files changed

+67
-3
lines changed

src/main/java/io/reactivex/processors/PublishProcessor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,7 @@ public void onNext(T t) {
313313
}
314314
if (r != 0L) {
315315
actual.onNext(t);
316-
if (r != Long.MAX_VALUE) {
317-
decrementAndGet();
318-
}
316+
BackpressureHelper.producedCancel(this, 1);
319317
} else {
320318
cancel();
321319
actual.onError(new MissingBackpressureException("Could not emit value due to lack of requests"));

src/test/java/io/reactivex/processors/BehaviorProcessorTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,4 +812,37 @@ public void run() {
812812
ts.assertFailure(TestException.class);
813813
}
814814
}
815+
816+
@Test(timeout = 10000)
817+
public void subscriberCancelOfferRace() {
818+
for (int i = 0; i < 1000; i++) {
819+
final BehaviorProcessor<Integer> pp = BehaviorProcessor.create();
820+
821+
final TestSubscriber<Integer> ts = pp.test(1);
822+
823+
Runnable r1 = new Runnable() {
824+
@Override
825+
public void run() {
826+
for (int i = 0; i < 2; i++) {
827+
while (!pp.offer(i)) ;
828+
}
829+
}
830+
};
831+
832+
Runnable r2 = new Runnable() {
833+
@Override
834+
public void run() {
835+
ts.cancel();
836+
}
837+
};
838+
839+
TestHelper.race(r1, r2);
840+
841+
if (ts.valueCount() > 0) {
842+
ts.assertValuesOnly(0);
843+
} else {
844+
ts.assertEmpty();
845+
}
846+
}
847+
}
815848
}

src/test/java/io/reactivex/processors/PublishProcessorTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,4 +677,37 @@ public void run() {
677677
.awaitDone(5, TimeUnit.SECONDS)
678678
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
679679
}
680+
681+
@Test(timeout = 10000)
682+
public void subscriberCancelOfferRace() {
683+
for (int i = 0; i < 1000; i++) {
684+
final PublishProcessor<Integer> pp = PublishProcessor.create();
685+
686+
final TestSubscriber<Integer> ts = pp.test(1);
687+
688+
Runnable r1 = new Runnable() {
689+
@Override
690+
public void run() {
691+
for (int i = 0; i < 2; i++) {
692+
while (!pp.offer(i)) ;
693+
}
694+
}
695+
};
696+
697+
Runnable r2 = new Runnable() {
698+
@Override
699+
public void run() {
700+
ts.cancel();
701+
}
702+
};
703+
704+
TestHelper.race(r1, r2);
705+
706+
if (ts.valueCount() > 0) {
707+
ts.assertValuesOnly(0);
708+
} else {
709+
ts.assertEmpty();
710+
}
711+
}
712+
}
680713
}

0 commit comments

Comments
 (0)