Skip to content

Commit 31dc74a

Browse files
committed
Merge pull request #3842 from davidmoten/scheduler-unsub-order
1.x: improve ExecutorScheduler worker unsubscription
2 parents bcd7fa1 + ebf0e3f commit 31dc74a

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

src/main/java/rx/schedulers/ExecutorScheduler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,20 @@ public Subscription schedule(Action0 action) {
9696
@Override
9797
public void run() {
9898
do {
99+
if (tasks.isUnsubscribed()) {
100+
queue.clear();
101+
return;
102+
}
103+
99104
ScheduledAction sa = queue.poll();
105+
if (sa == null) {
106+
return;
107+
}
108+
100109
if (!sa.isUnsubscribed()) {
101110
sa.run();
102111
}
103-
} while (wip.decrementAndGet() > 0);
112+
} while (wip.decrementAndGet() != 0);
104113
}
105114

106115
@Override
@@ -170,6 +179,7 @@ public boolean isUnsubscribed() {
170179
@Override
171180
public void unsubscribe() {
172181
tasks.unsubscribe();
182+
queue.clear();
173183
}
174184

175185
}

src/test/java/rx/schedulers/ExecutorSchedulerTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import static org.junit.Assert.*;
1919

2020
import java.lang.management.*;
21+
import java.util.Queue;
2122
import java.util.concurrent.*;
2223
import java.util.concurrent.atomic.AtomicInteger;
2324

25+
import org.junit.Assert;
2426
import org.junit.Test;
2527

2628
import rx.*;
@@ -275,4 +277,32 @@ public void call() {
275277

276278
assertFalse(w.tasks.hasSubscriptions());
277279
}
280+
281+
@Test
282+
public void workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace() {
283+
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(1));
284+
for (int i = 0; i< 1000; i++) {
285+
Worker worker = scheduler.createWorker();
286+
final Queue<Integer> q = new ConcurrentLinkedQueue<Integer>();
287+
Action0 action1 = new Action0() {
288+
289+
@Override
290+
public void call() {
291+
q.add(1);
292+
}};
293+
Action0 action2 = new Action0() {
294+
295+
@Override
296+
public void call() {
297+
q.add(2);
298+
}};
299+
worker.schedule(action1);
300+
worker.schedule(action2);
301+
worker.unsubscribe();
302+
if (q.size()==1 && q.poll() == 2) {
303+
//expect a queue of 1,2 or 1. If queue is just 2 then we have a problem!
304+
Assert.fail("wrong order on loop " + i);
305+
}
306+
}
307+
}
278308
}

0 commit comments

Comments
 (0)