Skip to content

Commit e0929ee

Browse files
committed
Change the workers to capture the stack trace for all subsequent scheduled actions to increase the readability of uncaught and fatal exceptions that bubble up to the schedulers.
1 parent d43c05c commit e0929ee

9 files changed

+282
-8
lines changed

src/main/java/rx/internal/schedulers/ExecutorScheduler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public Worker createWorker() {
4646

4747
/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
4848
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
49+
private final Throwable creationContext = new SchedulerContextException();
4950
final Executor executor;
5051
// TODO: use a better performing structure for task tracking
5152
final CompositeSubscription tasks;
@@ -68,7 +69,7 @@ public Subscription schedule(Action0 action) {
6869
if (isUnsubscribed()) {
6970
return Subscriptions.unsubscribed();
7071
}
71-
ScheduledAction ea = new ScheduledAction(action, tasks);
72+
ScheduledAction ea = new ScheduledAction(action, tasks, creationContext);
7273
tasks.add(ea);
7374
queue.offer(ea);
7475
if (wip.getAndIncrement() == 0) {
@@ -150,7 +151,7 @@ public void call() {
150151
((ScheduledAction)s2).add(removeMas);
151152
}
152153
}
153-
});
154+
}, this.creationContext);
154155
// This will make sure if ea.call() gets executed before this line
155156
// we don't override the current task in mas.
156157
first.set(ea);

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* @warn class description missing
3434
*/
3535
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
36+
private final Throwable creationContext = new SchedulerContextException();
3637
private final ScheduledExecutorService executor;
3738
private final RxJavaSchedulersHook schedulersHook;
3839
volatile boolean isUnsubscribed;
@@ -233,7 +234,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
233234
*/
234235
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
235236
Action0 decoratedAction = schedulersHook.onSchedule(action);
236-
ScheduledAction run = new ScheduledAction(decoratedAction);
237+
ScheduledAction run = new ScheduledAction(decoratedAction, creationContext);
237238
Future<?> f;
238239
if (delayTime <= 0) {
239240
f = executor.submit(run);
@@ -246,7 +247,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
246247
}
247248
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
248249
Action0 decoratedAction = schedulersHook.onSchedule(action);
249-
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
250+
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
250251
parent.add(run);
251252

252253
Future<?> f;
@@ -262,7 +263,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
262263

263264
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
264265
Action0 decoratedAction = schedulersHook.onSchedule(action);
265-
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
266+
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
266267
parent.add(run);
267268

268269
Future<?> f;

src/main/java/rx/internal/schedulers/ScheduledAction.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.atomic.*;
2020

2121
import rx.Subscription;
22+
import rx.exceptions.Exceptions;
2223
import rx.exceptions.OnErrorNotImplementedException;
2324
import rx.functions.Action0;
2425
import rx.internal.util.SubscriptionList;
@@ -34,18 +35,22 @@ public final class ScheduledAction extends AtomicReference<Thread> implements Ru
3435
private static final long serialVersionUID = -3962399486978279857L;
3536
final SubscriptionList cancel;
3637
final Action0 action;
38+
final Throwable creationContext;
3739

38-
public ScheduledAction(Action0 action) {
40+
public ScheduledAction(Action0 action, Throwable creationContext) {
3941
this.action = action;
4042
this.cancel = new SubscriptionList();
43+
this.creationContext = creationContext;
4144
}
42-
public ScheduledAction(Action0 action, CompositeSubscription parent) {
45+
public ScheduledAction(Action0 action, CompositeSubscription parent, Throwable creationContext) {
4346
this.action = action;
4447
this.cancel = new SubscriptionList(new Remover(this, parent));
48+
this.creationContext = creationContext;
4549
}
46-
public ScheduledAction(Action0 action, SubscriptionList parent) {
50+
public ScheduledAction(Action0 action, SubscriptionList parent, Throwable creationContext) {
4751
this.action = action;
4852
this.cancel = new SubscriptionList(new Remover2(this, parent));
53+
this.creationContext = creationContext;
4954
}
5055

5156
@Override
@@ -61,6 +66,7 @@ public void run() {
6166
} else {
6267
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
6368
}
69+
Exceptions.addCause(ie, creationContext);
6470
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
6571
Thread thread = Thread.currentThread();
6672
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
/**
19+
* Used only for providing context around where work was scheduled should an error occur in a different thread.
20+
*/
21+
public class SchedulerContextException extends Exception {
22+
public SchedulerContextException() {
23+
super("Asynchronous work scheduled at");
24+
}
25+
26+
private static final long serialVersionUID = 1L;
27+
}

src/test/java/rx/schedulers/ComputationSchedulerTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.HashMap;
2323
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.atomic.AtomicReference;
2425

2526
import org.junit.Test;
2627

@@ -30,6 +31,8 @@
3031
import rx.functions.Action0;
3132
import rx.functions.Action1;
3233
import rx.functions.Func1;
34+
import rx.plugins.RxJavaErrorHandler;
35+
import rx.plugins.RxJavaPlugins;
3336

3437
public class ComputationSchedulerTests extends AbstractSchedulerConcurrencyTests {
3538

@@ -168,4 +171,48 @@ public void testCancelledTaskRetention() throws InterruptedException {
168171
w.unsubscribe();
169172
}
170173
}
174+
175+
@Test
176+
public void testStackTraceAcrossThreads() throws Throwable {
177+
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
178+
final CountDownLatch done = new CountDownLatch(1);
179+
180+
RxJavaPlugins.getInstance().reset();
181+
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
182+
@Override
183+
public void handleError(Throwable e) {
184+
exceptionRef.set(e);
185+
done.countDown();
186+
}
187+
});
188+
189+
try {
190+
getScheduler().createWorker().schedule(new Action0() {
191+
@Override
192+
public void call() {
193+
throw new RuntimeException();
194+
}
195+
});
196+
} catch (Exception e) {
197+
exceptionRef.set(e);
198+
done.countDown();
199+
}
200+
201+
done.await();
202+
203+
Throwable exception = exceptionRef.get();
204+
Throwable e = exception;
205+
while (e != null) {
206+
StackTraceElement[] st = e.getStackTrace();
207+
for (StackTraceElement stackTraceElement : st) {
208+
if (stackTraceElement.getClassName().contains(getClass().getName())) {
209+
// pass we found this class in the stack trace.
210+
return;
211+
}
212+
}
213+
e = e.getCause();
214+
}
215+
216+
throw exception;
217+
}
171218
}

src/test/java/rx/schedulers/ImmediateSchedulerTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@
1717

1818
import static org.junit.Assert.assertTrue;
1919

20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
2023
import org.junit.Test;
2124

2225
import rx.Observable;
2326
import rx.Scheduler;
27+
import rx.functions.Action0;
2428
import rx.functions.Action1;
2529
import rx.functions.Func1;
30+
import rx.plugins.RxJavaErrorHandler;
31+
import rx.plugins.RxJavaPlugins;
2632

2733
public class ImmediateSchedulerTest extends AbstractSchedulerTests {
2834

@@ -101,4 +107,48 @@ public void call(String t) {
101107
}
102108
});
103109
}
110+
111+
@Test
112+
public void testStackTraceAcrossThreads() throws Throwable {
113+
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
114+
final CountDownLatch done = new CountDownLatch(1);
115+
116+
RxJavaPlugins.getInstance().reset();
117+
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
118+
@Override
119+
public void handleError(Throwable e) {
120+
exceptionRef.set(e);
121+
done.countDown();
122+
}
123+
});
124+
125+
try {
126+
getScheduler().createWorker().schedule(new Action0() {
127+
@Override
128+
public void call() {
129+
throw new RuntimeException();
130+
}
131+
});
132+
} catch (Exception e) {
133+
exceptionRef.set(e);
134+
done.countDown();
135+
}
136+
137+
done.await();
138+
139+
Throwable exception = exceptionRef.get();
140+
Throwable e = exception;
141+
while (e != null) {
142+
StackTraceElement[] st = e.getStackTrace();
143+
for (StackTraceElement stackTraceElement : st) {
144+
if (stackTraceElement.getClassName().contains(getClass().getName())) {
145+
// pass we found this class in the stack trace.
146+
return;
147+
}
148+
}
149+
e = e.getCause();
150+
}
151+
152+
throw exception;
153+
}
104154
}

src/test/java/rx/schedulers/IoSchedulerTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818

1919
import static org.junit.Assert.assertTrue;
2020

21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
2124
import org.junit.Test;
2225

2326
import rx.*;
2427
import rx.Scheduler.Worker;
2528
import rx.functions.*;
29+
import rx.plugins.RxJavaErrorHandler;
30+
import rx.plugins.RxJavaPlugins;
2631

2732
public class IoSchedulerTest extends AbstractSchedulerConcurrencyTests {
2833

@@ -83,4 +88,47 @@ public void testCancelledTaskRetention() throws InterruptedException {
8388
}
8489
}
8590

91+
@Test
92+
public void testStackTraceAcrossThreads() throws Throwable {
93+
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
94+
final CountDownLatch done = new CountDownLatch(1);
95+
96+
RxJavaPlugins.getInstance().reset();
97+
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
98+
@Override
99+
public void handleError(Throwable e) {
100+
exceptionRef.set(e);
101+
done.countDown();
102+
}
103+
});
104+
105+
try {
106+
getScheduler().createWorker().schedule(new Action0() {
107+
@Override
108+
public void call() {
109+
throw new RuntimeException();
110+
}
111+
});
112+
} catch (Exception e) {
113+
exceptionRef.set(e);
114+
done.countDown();
115+
}
116+
117+
done.await();
118+
119+
Throwable exception = exceptionRef.get();
120+
Throwable e = exception;
121+
while (e != null) {
122+
StackTraceElement[] st = e.getStackTrace();
123+
for (StackTraceElement stackTraceElement : st) {
124+
if (stackTraceElement.getClassName().contains(getClass().getName())) {
125+
// pass we found this class in the stack trace.
126+
return;
127+
}
128+
}
129+
e = e.getCause();
130+
}
131+
132+
throw exception;
133+
}
86134
}

src/test/java/rx/schedulers/NewThreadSchedulerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import rx.Scheduler;
2828
import rx.functions.Action0;
2929
import rx.internal.schedulers.ScheduledAction;
30+
import rx.plugins.RxJavaErrorHandler;
31+
import rx.plugins.RxJavaPlugins;
3032
import rx.subscriptions.Subscriptions;
3133

3234
public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
@@ -83,4 +85,48 @@ public void call() {
8385
worker.unsubscribe();
8486
}
8587
}
88+
89+
@Test
90+
public void testStackTraceAcrossThreads() throws Throwable {
91+
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
92+
final CountDownLatch done = new CountDownLatch(1);
93+
94+
RxJavaPlugins.getInstance().reset();
95+
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
96+
@Override
97+
public void handleError(Throwable e) {
98+
exceptionRef.set(e);
99+
done.countDown();
100+
}
101+
});
102+
103+
try {
104+
getScheduler().createWorker().schedule(new Action0() {
105+
@Override
106+
public void call() {
107+
throw new RuntimeException();
108+
}
109+
});
110+
} catch (Exception e) {
111+
exceptionRef.set(e);
112+
done.countDown();
113+
}
114+
115+
done.await();
116+
117+
Throwable exception = exceptionRef.get();
118+
Throwable e = exception;
119+
while (e != null) {
120+
StackTraceElement[] st = e.getStackTrace();
121+
for (StackTraceElement stackTraceElement : st) {
122+
if (stackTraceElement.getClassName().contains(getClass().getName())) {
123+
// pass we found this class in the stack trace.
124+
return;
125+
}
126+
}
127+
e = e.getCause();
128+
}
129+
130+
throw exception;
131+
}
86132
}

0 commit comments

Comments
 (0)