Skip to content

Commit 8af2bc9

Browse files
committed
Fix an unsubscribe race in EventLoopWorker (#3868)
There is an unsubscribe race condition similar to #3842 in `CachedThreadScheduler.EventLoopWorker` and `EventLoopsScheduler.EventLoopWorker`. Image the following execution order: | Execution Order | thread 1 | thread 2 | | ------------- | ------------- | ------------- | | 1 | | submit task A | | 2 | | submit task B | | 3 | unsubscribe Worker | | | 4 | unsubscribe task A | | | 5 | | task A won't run as it's unsubscribed | | 6 | | run task B | | 7 | unsubscribe task B | | So task B will run but its previous task A will be skipped. This PR adds a check before running an action and moves `workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace` to `AbstractSchedulerConcurrencyTests` to test all concurrent schedulers.
1 parent 6efc2cf commit 8af2bc9

File tree

4 files changed

+63
-37
lines changed

4 files changed

+63
-37
lines changed

src/main/java/rx/internal/schedulers/CachedThreadScheduler.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,21 @@ public Subscription schedule(Action0 action) {
204204
}
205205

206206
@Override
207-
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
207+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
208208
if (innerSubscription.isUnsubscribed()) {
209209
// don't schedule, we are unsubscribed
210210
return Subscriptions.unsubscribed();
211211
}
212212

213-
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
213+
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
214+
@Override
215+
public void call() {
216+
if (isUnsubscribed()) {
217+
return;
218+
}
219+
action.call();
220+
}
221+
}, delayTime, unit);
214222
innerSubscription.add(s);
215223
s.addParent(innerSubscription);
216224
return s;

src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,20 +156,37 @@ public boolean isUnsubscribed() {
156156
}
157157

158158
@Override
159-
public Subscription schedule(Action0 action) {
159+
public Subscription schedule(final Action0 action) {
160160
if (isUnsubscribed()) {
161161
return Subscriptions.unsubscribed();
162162
}
163163

164-
return poolWorker.scheduleActual(action, 0, null, serial);
164+
return poolWorker.scheduleActual(new Action0() {
165+
@Override
166+
public void call() {
167+
if (isUnsubscribed()) {
168+
return;
169+
}
170+
action.call();
171+
}
172+
}, 0, null, serial);
165173
}
174+
166175
@Override
167-
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
176+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
168177
if (isUnsubscribed()) {
169178
return Subscriptions.unsubscribed();
170179
}
171180

172-
return poolWorker.scheduleActual(action, delayTime, unit, timed);
181+
return poolWorker.scheduleActual(new Action0() {
182+
@Override
183+
public void call() {
184+
if (isUnsubscribed()) {
185+
return;
186+
}
187+
action.call();
188+
}
189+
}, delayTime, unit, timed);
173190
}
174191
}
175192

@@ -178,4 +195,4 @@ static final class PoolWorker extends NewThreadWorker {
178195
super(threadFactory);
179196
}
180197
}
181-
}
198+
}

src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.junit.Assert.assertTrue;
2121
import static org.junit.Assert.fail;
2222

23+
import java.util.Queue;
24+
import java.util.concurrent.ConcurrentLinkedQueue;
2325
import java.util.concurrent.CountDownLatch;
2426
import java.util.concurrent.TimeUnit;
2527
import java.util.concurrent.atomic.AtomicBoolean;
@@ -423,4 +425,33 @@ public void call(Integer t) {
423425
assertEquals(5, count.get());
424426
}
425427

428+
@Test
429+
public void workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace() {
430+
Scheduler scheduler = getScheduler();
431+
for (int i = 0; i < 1000; i++) {
432+
Worker worker = scheduler.createWorker();
433+
final Queue<Integer> q = new ConcurrentLinkedQueue<Integer>();
434+
Action0 action1 = new Action0() {
435+
436+
@Override
437+
public void call() {
438+
q.add(1);
439+
}
440+
};
441+
Action0 action2 = new Action0() {
442+
443+
@Override
444+
public void call() {
445+
q.add(2);
446+
}
447+
};
448+
worker.schedule(action1);
449+
worker.schedule(action2);
450+
worker.unsubscribe();
451+
if (q.size() == 1 && q.poll() == 2) {
452+
//expect a queue of 1,2 or 1. If queue is just 2 then we have a problem!
453+
fail("wrong order on loop " + i);
454+
}
455+
}
456+
}
426457
}

src/test/java/rx/schedulers/ExecutorSchedulerTest.java

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818
import static org.junit.Assert.*;
1919

2020
import java.lang.management.*;
21-
import java.util.Queue;
2221
import java.util.concurrent.*;
2322
import java.util.concurrent.atomic.AtomicInteger;
2423

25-
import org.junit.Assert;
2624
import org.junit.Test;
2725

2826
import rx.*;
@@ -277,32 +275,4 @@ public void call() {
277275

278276
assertFalse(w.tasks.hasSubscriptions());
279277
}
280-
281-
@Test
282-
public void workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace() {
283-
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(1));
284-
for (int i = 0; i< 1000; i++) {
285-
Worker worker = scheduler.createWorker();
286-
final Queue<Integer> q = new ConcurrentLinkedQueue<Integer>();
287-
Action0 action1 = new Action0() {
288-
289-
@Override
290-
public void call() {
291-
q.add(1);
292-
}};
293-
Action0 action2 = new Action0() {
294-
295-
@Override
296-
public void call() {
297-
q.add(2);
298-
}};
299-
worker.schedule(action1);
300-
worker.schedule(action2);
301-
worker.unsubscribe();
302-
if (q.size()==1 && q.poll() == 2) {
303-
//expect a queue of 1,2 or 1. If queue is just 2 then we have a problem!
304-
Assert.fail("wrong order on loop " + i);
305-
}
306-
}
307-
}
308278
}

0 commit comments

Comments
 (0)