Skip to content

Commit 9521512

Browse files
authored
2.x: distinguish between sync and async dispose in ScheduledRunnable (#5715)
* 2.x: distinguish between sync and async dispose in ScheduledRunnable * Coverage improvement and fix a missing state change * Add test case for the parent-done reordered check
1 parent b99efbf commit 9521512

File tree

2 files changed

+133
-12
lines changed

2 files changed

+133
-12
lines changed

src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
2626
private static final long serialVersionUID = -6120223772001106981L;
2727
final Runnable actual;
2828

29-
static final Object DISPOSED = new Object();
29+
/** Indicates that the parent tracking this task has been notified about its completion. */
30+
static final Object PARENT_DISPOSED = new Object();
31+
/** Indicates the dispose() was called from within the run/call method. */
32+
static final Object SYNC_DISPOSED = new Object();
33+
/** Indicates the dispose() was called from another thread. */
34+
static final Object ASYNC_DISPOSED = new Object();
3035

3136
static final Object DONE = new Object();
3237

@@ -66,13 +71,13 @@ public void run() {
6671
} finally {
6772
lazySet(THREAD_INDEX, null);
6873
Object o = get(PARENT_INDEX);
69-
if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
74+
if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
7075
((DisposableContainer)o).delete(this);
7176
}
7277

7378
for (;;) {
7479
o = get(FUTURE_INDEX);
75-
if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
80+
if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
7681
break;
7782
}
7883
}
@@ -85,8 +90,12 @@ public void setFuture(Future<?> f) {
8590
if (o == DONE) {
8691
return;
8792
}
88-
if (o == DISPOSED) {
89-
f.cancel(get(THREAD_INDEX) != Thread.currentThread());
93+
if (o == SYNC_DISPOSED) {
94+
f.cancel(false);
95+
return;
96+
}
97+
if (o == ASYNC_DISPOSED) {
98+
f.cancel(true);
9099
return;
91100
}
92101
if (compareAndSet(FUTURE_INDEX, o, f)) {
@@ -99,23 +108,24 @@ public void setFuture(Future<?> f) {
99108
public void dispose() {
100109
for (;;) {
101110
Object o = get(FUTURE_INDEX);
102-
if (o == DONE || o == DISPOSED) {
111+
if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) {
103112
break;
104113
}
105-
if (compareAndSet(FUTURE_INDEX, o, DISPOSED)) {
114+
boolean async = get(THREAD_INDEX) != Thread.currentThread();
115+
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
106116
if (o != null) {
107-
((Future<?>)o).cancel(get(THREAD_INDEX) != Thread.currentThread());
117+
((Future<?>)o).cancel(async);
108118
}
109119
break;
110120
}
111121
}
112122

113123
for (;;) {
114124
Object o = get(PARENT_INDEX);
115-
if (o == DONE || o == DISPOSED || o == null) {
125+
if (o == DONE || o == PARENT_DISPOSED || o == null) {
116126
return;
117127
}
118-
if (compareAndSet(PARENT_INDEX, o, DISPOSED)) {
128+
if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) {
119129
((DisposableContainer)o).delete(this);
120130
return;
121131
}
@@ -124,7 +134,7 @@ public void dispose() {
124134

125135
@Override
126136
public boolean isDisposed() {
127-
Object o = get(FUTURE_INDEX);
128-
return o == DISPOSED || o == DONE;
137+
Object o = get(PARENT_INDEX);
138+
return o == PARENT_DISPOSED || o == DONE;
129139
}
130140
}

src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.lang.Thread.UncaughtExceptionHandler;
1919
import java.util.List;
2020
import java.util.concurrent.FutureTask;
21+
import java.util.concurrent.atomic.*;
2122

2223
import org.junit.Test;
2324

@@ -283,4 +284,114 @@ public void run() {
283284
TestHelper.race(r1, r2);
284285
}
285286
}
287+
288+
@Test
289+
public void syncWorkerCancelRace() {
290+
for (int i = 0; i < 10000; i++) {
291+
final CompositeDisposable set = new CompositeDisposable();
292+
final AtomicBoolean interrupted = new AtomicBoolean();
293+
final AtomicInteger sync = new AtomicInteger(2);
294+
final AtomicInteger syncb = new AtomicInteger(2);
295+
296+
Runnable r0 = new Runnable() {
297+
@Override
298+
public void run() {
299+
set.dispose();
300+
if (sync.decrementAndGet() != 0) {
301+
while (sync.get() != 0) { }
302+
}
303+
if (syncb.decrementAndGet() != 0) {
304+
while (syncb.get() != 0) { }
305+
}
306+
for (int j = 0; j < 1000; j++) {
307+
if (Thread.currentThread().isInterrupted()) {
308+
interrupted.set(true);
309+
break;
310+
}
311+
}
312+
}
313+
};
314+
315+
final ScheduledRunnable run = new ScheduledRunnable(r0, set);
316+
set.add(run);
317+
318+
final FutureTask<Void> ft = new FutureTask<Void>(run, null);
319+
320+
Runnable r2 = new Runnable() {
321+
@Override
322+
public void run() {
323+
if (sync.decrementAndGet() != 0) {
324+
while (sync.get() != 0) { }
325+
}
326+
run.setFuture(ft);
327+
if (syncb.decrementAndGet() != 0) {
328+
while (syncb.get() != 0) { }
329+
}
330+
}
331+
};
332+
333+
TestHelper.race(ft, r2);
334+
335+
assertFalse("The task was interrupted", interrupted.get());
336+
}
337+
}
338+
339+
@Test
340+
public void disposeAfterRun() {
341+
final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null);
342+
343+
run.run();
344+
assertEquals(ScheduledRunnable.DONE, run.get(ScheduledRunnable.FUTURE_INDEX));
345+
346+
run.dispose();
347+
assertEquals(ScheduledRunnable.DONE, run.get(ScheduledRunnable.FUTURE_INDEX));
348+
}
349+
350+
@Test
351+
public void syncDisposeIdempotent() {
352+
final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null);
353+
run.set(ScheduledRunnable.THREAD_INDEX, Thread.currentThread());
354+
355+
run.dispose();
356+
assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
357+
run.dispose();
358+
assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
359+
run.run();
360+
assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
361+
}
362+
363+
@Test
364+
public void asyncDisposeIdempotent() {
365+
final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null);
366+
367+
run.dispose();
368+
assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
369+
run.dispose();
370+
assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
371+
run.run();
372+
assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
373+
}
374+
375+
376+
@Test
377+
public void noParentIsDisposed() {
378+
ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null);
379+
assertFalse(run.isDisposed());
380+
run.run();
381+
assertTrue(run.isDisposed());
382+
}
383+
384+
@Test
385+
public void withParentIsDisposed() {
386+
CompositeDisposable set = new CompositeDisposable();
387+
ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set);
388+
set.add(run);
389+
390+
assertFalse(run.isDisposed());
391+
392+
run.run();
393+
assertTrue(run.isDisposed());
394+
395+
assertFalse(set.remove(run));
396+
}
286397
}

0 commit comments

Comments
 (0)