Skip to content

Commit c6ada18

Browse files
committed
Change the workers to capture the stack trace for all subsequent scheduled actions to increase the readability of uncaught and fatal exceptions that bubble up to the schedulers.
1 parent 271c83b commit c6ada18

File tree

7 files changed

+119
-9
lines changed

7 files changed

+119
-9
lines changed

src/main/java/rx/internal/schedulers/CachedThreadScheduler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ ThreadWorker get() {
7878
while (!expiringWorkerQueue.isEmpty()) {
7979
ThreadWorker threadWorker = expiringWorkerQueue.poll();
8080
if (threadWorker != null) {
81+
threadWorker.resetContext();
8182
return threadWorker;
8283
}
8384
}

src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private static class EventLoopWorker extends Scheduler.Worker {
142142

143143
EventLoopWorker(PoolWorker poolWorker) {
144144
this.poolWorker = poolWorker;
145-
145+
poolWorker.resetContext();
146146
}
147147

148148
@Override

src/main/java/rx/internal/schedulers/ExecutorScheduler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public Worker createWorker() {
4242

4343
/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
4444
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
45+
private final Throwable creationContext = SchedulerContextException.create();
4546
final Executor executor;
4647
// TODO: use a better performing structure for task tracking
4748
final CompositeSubscription tasks;
@@ -64,7 +65,7 @@ public Subscription schedule(Action0 action) {
6465
if (isUnsubscribed()) {
6566
return Subscriptions.unsubscribed();
6667
}
67-
ScheduledAction ea = new ScheduledAction(action, tasks);
68+
ScheduledAction ea = new ScheduledAction(action, tasks, creationContext);
6869
tasks.add(ea);
6970
queue.offer(ea);
7071
if (wip.getAndIncrement() == 0) {
@@ -146,7 +147,7 @@ public void call() {
146147
((ScheduledAction)s2).add(removeMas);
147148
}
148149
}
149-
});
150+
}, this.creationContext);
150151
// This will make sure if ea.call() gets executed before this line
151152
// we don't override the current task in mas.
152153
first.set(ea);

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* @warn class description missing
3434
*/
3535
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
36+
private Throwable creationContext = SchedulerContextException.create();
3637
private final ScheduledExecutorService executor;
3738
private final RxJavaSchedulersHook schedulersHook;
3839
volatile boolean isUnsubscribed;
@@ -234,7 +235,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
234235
*/
235236
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
236237
Action0 decoratedAction = schedulersHook.onSchedule(action);
237-
ScheduledAction run = new ScheduledAction(decoratedAction);
238+
ScheduledAction run = new ScheduledAction(decoratedAction, creationContext);
238239
Future<?> f;
239240
if (delayTime <= 0) {
240241
f = executor.submit(run);
@@ -247,7 +248,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
247248
}
248249
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
249250
Action0 decoratedAction = schedulersHook.onSchedule(action);
250-
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
251+
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
251252
parent.add(run);
252253

253254
Future<?> f;
@@ -263,7 +264,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
263264

264265
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
265266
Action0 decoratedAction = schedulersHook.onSchedule(action);
266-
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
267+
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
267268
parent.add(run);
268269

269270
Future<?> f;
@@ -288,4 +289,8 @@ public void unsubscribe() {
288289
public boolean isUnsubscribed() {
289290
return isUnsubscribed;
290291
}
292+
293+
public void resetContext() {
294+
creationContext = SchedulerContextException.create();
295+
}
291296
}

src/main/java/rx/internal/schedulers/ScheduledAction.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.atomic.*;
2020

2121
import rx.Subscription;
22+
import rx.exceptions.Exceptions;
2223
import rx.exceptions.OnErrorNotImplementedException;
2324
import rx.functions.Action0;
2425
import rx.internal.util.SubscriptionList;
@@ -34,18 +35,22 @@ public final class ScheduledAction extends AtomicReference<Thread> implements Ru
3435
private static final long serialVersionUID = -3962399486978279857L;
3536
final SubscriptionList cancel;
3637
final Action0 action;
38+
final Throwable creationContext;
3739

38-
public ScheduledAction(Action0 action) {
40+
public ScheduledAction(Action0 action, Throwable creationContext) {
3941
this.action = action;
4042
this.cancel = new SubscriptionList();
43+
this.creationContext = creationContext;
4144
}
42-
public ScheduledAction(Action0 action, CompositeSubscription parent) {
45+
public ScheduledAction(Action0 action, CompositeSubscription parent, Throwable creationContext) {
4346
this.action = action;
4447
this.cancel = new SubscriptionList(new Remover(this, parent));
48+
this.creationContext = creationContext;
4549
}
46-
public ScheduledAction(Action0 action, SubscriptionList parent) {
50+
public ScheduledAction(Action0 action, SubscriptionList parent, Throwable creationContext) {
4751
this.action = action;
4852
this.cancel = new SubscriptionList(new Remover2(this, parent));
53+
this.creationContext = creationContext;
4954
}
5055

5156
@Override
@@ -61,6 +66,7 @@ public void run() {
6166
} else {
6267
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
6368
}
69+
Exceptions.addCause(ie, creationContext);
6470
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
6571
Thread thread = Thread.currentThread();
6672
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
/**
19+
* Used only for providing context around where work was scheduled should an error occur in a different thread.
20+
*/
21+
public class SchedulerContextException extends Exception {
22+
/**
23+
* Constant to use when disabled
24+
*/
25+
private static final Throwable CONTEXT_MISSING = new SchedulerContextException("Missing context. Enable by setting the system property \"rxjava.captureSchedulerContext=true\"");
26+
27+
static {
28+
CONTEXT_MISSING.setStackTrace(new StackTraceElement[0]);
29+
}
30+
31+
/**
32+
* @return a {@link Throwable} that captures the stack trace or a {@link Throwable} that documents how to enable the feature if needed.
33+
*/
34+
public static Throwable create() {
35+
String def = "false";
36+
String setTo = System.getProperty("rxjava.captureSchedulerContext", def);
37+
return setTo != def && "true".equals(setTo) ? new SchedulerContextException("Asynchronous work scheduled at") : CONTEXT_MISSING;
38+
}
39+
40+
private SchedulerContextException(String message) {
41+
super(message);
42+
}
43+
44+
private static final long serialVersionUID = 1L;
45+
}

src/test/java/rx/schedulers/AbstractSchedulerTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import rx.functions.Action0;
4444
import rx.functions.Action1;
4545
import rx.functions.Func1;
46+
import rx.plugins.RxJavaErrorHandler;
47+
import rx.plugins.RxJavaPlugins;
4648

4749
/**
4850
* Base tests for all schedulers including Immediate/Current.
@@ -502,4 +504,54 @@ public void onNext(T args) {
502504

503505
}
504506

507+
@Test
508+
public final void testStackTraceAcrossThreads() throws Throwable {
509+
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
510+
final CountDownLatch done = new CountDownLatch(1);
511+
System.setProperty("rxjava.captureSchedulerContext", "true");
512+
513+
try {
514+
515+
RxJavaPlugins.getInstance().reset();
516+
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
517+
@Override
518+
public void handleError(Throwable e) {
519+
exceptionRef.set(e);
520+
done.countDown();
521+
}
522+
});
523+
524+
try {
525+
getScheduler().createWorker().schedule(new Action0() {
526+
@Override
527+
public void call() {
528+
throw new RuntimeException();
529+
}
530+
});
531+
} catch (Exception e) {
532+
exceptionRef.set(e);
533+
done.countDown();
534+
}
535+
536+
done.await();
537+
538+
Throwable exception = exceptionRef.get();
539+
Throwable e = exception;
540+
while (e.getCause() != null) {
541+
e = e.getCause();
542+
}
543+
544+
StackTraceElement[] st = e.getStackTrace();
545+
for (StackTraceElement stackTraceElement : st) {
546+
if (stackTraceElement.getMethodName().equals("testStackTraceAcrossThreads")) {
547+
// pass we found this class in the stack trace.
548+
return;
549+
}
550+
}
551+
552+
throw exception;
553+
} finally {
554+
System.setProperty("rxjava.captureSchedulerContext", "false");
555+
}
556+
}
505557
}

0 commit comments

Comments
 (0)