diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7114a42e76..4cbf28d18e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -1712,6 +1712,7 @@ public static Observable from(T[] array) { * You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The * rest of its methods are threadsafe. * + * @param the element type * @param asyncEmitter the emitter that is called when a Subscriber subscribes to the returned {@code Observable} * @param backpressure the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough * @return the new Observable instance diff --git a/src/main/java/rx/Scheduler.java b/src/main/java/rx/Scheduler.java index 756963e66f..1487373fcc 100644 --- a/src/main/java/rx/Scheduler.java +++ b/src/main/java/rx/Scheduler.java @@ -253,8 +253,10 @@ public long now() { * }); * * - * @param combine - * @return + * @param a Scheduler and a Subscription + * @param combine the function that takes a two-level nested Observable sequence of a Completable and returns + * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions. + * @return the Scheduler with the customized execution behavior */ @SuppressWarnings("unchecked") @Experimental diff --git a/src/main/java/rx/exceptions/AssemblyStackTraceException.java b/src/main/java/rx/exceptions/AssemblyStackTraceException.java index 752e8aad87..110fc803c8 100644 --- a/src/main/java/rx/exceptions/AssemblyStackTraceException.java +++ b/src/main/java/rx/exceptions/AssemblyStackTraceException.java @@ -15,7 +15,10 @@ */ package rx.exceptions; +import java.util.*; + import rx.annotations.Experimental; +import rx.plugins.RxJavaHooks; /** * A RuntimeException that is stackless but holds onto a textual @@ -27,16 +30,6 @@ public final class AssemblyStackTraceException extends RuntimeException { /** */ private static final long serialVersionUID = 2038859767182585852L; - /** - * Constructs an AssemblyStackTraceException with the given message and - * a cause. - * @param message the message - * @param cause the cause - */ - public AssemblyStackTraceException(String message, Throwable cause) { - super(message, cause); - } - /** * Constructs an AssemblyStackTraceException with the given message. * @param message the message @@ -49,4 +42,48 @@ public AssemblyStackTraceException(String message) { public synchronized Throwable fillInStackTrace() { // NOPMD return this; } + + /** + * Finds an empty cause slot and assigns itself to it. + * @param exception the exception to start from + */ + public void attachTo(Throwable exception) { + Set memory = new HashSet(); + + for (;;) { + if (exception.getCause() == null) { + exception.initCause(this); + return; + } + + exception = exception.getCause(); + if (!memory.add(exception)) { + // in case we run into a cycle, give up and report this to the hooks + RxJavaHooks.onError(this); + return; + } + } + } + + /** + * Locate the first AssemblyStackTraceException in the causal chain of the + * given Throwable (or it if it's one). + * @param e the input throwable + * @return the AssemblyStackTraceException located or null if not found + */ + public static AssemblyStackTraceException find(Throwable e) { + Set memory = new HashSet(); + for (;;) { + if (e instanceof AssemblyStackTraceException) { + return (AssemblyStackTraceException)e; + } + if (e == null || e.getCause() == null) { + return null; + } + e = e.getCause(); + if (!memory.add(e)) { + return null; + } + } + } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java index 11c7bcff0e..f26dda29dc 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java @@ -115,7 +115,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - e = new AssemblyStackTraceException(stacktrace, e); + new AssemblyStackTraceException(stacktrace).attachTo(e); actual.onError(e); } diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java index da5a144045..c8d7eaf52f 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java @@ -71,7 +71,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - e = new AssemblyStackTraceException(stacktrace, e); + new AssemblyStackTraceException(stacktrace).attachTo(e); actual.onError(e); } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java index aeed623a7b..be5ce6cb5f 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java @@ -61,7 +61,7 @@ public OnAssemblySingleSubscriber(SingleSubscriber actual, String sta @Override public void onError(Throwable e) { - e = new AssemblyStackTraceException(stacktrace, e); + new AssemblyStackTraceException(stacktrace).attachTo(e); actual.onError(e); } diff --git a/src/main/java/rx/plugins/RxJavaHooks.java b/src/main/java/rx/plugins/RxJavaHooks.java index cef9a8f6fb..75602e7c2b 100644 --- a/src/main/java/rx/plugins/RxJavaHooks.java +++ b/src/main/java/rx/plugins/RxJavaHooks.java @@ -974,7 +974,7 @@ public static Func1 getOnSingleLift() { *

* Calling with a {@code null} parameter restores the default behavior: * the hook returns the same object. - * @param onObservableLift the function that is called with original Operator and should + * @param onCompletableLift the function that is called with original Operator and should * return an Operator instance. */ public static void setOnCompletableLift(Func1 onCompletableLift) { diff --git a/src/test/java/rx/plugins/RxJavaHooksTest.java b/src/test/java/rx/plugins/RxJavaHooksTest.java index 9209228124..a1df503815 100644 --- a/src/test/java/rx/plugins/RxJavaHooksTest.java +++ b/src/test/java/rx/plugins/RxJavaHooksTest.java @@ -64,15 +64,15 @@ public void assemblyTrackingObservable() { createObservable().subscribe(ts); - ts.assertError(AssemblyStackTraceException.class); + ts.assertError(TestException.class); Throwable ex = ts.getOnErrorEvents().get(0); - assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); + AssemblyStackTraceException aste = AssemblyStackTraceException.find(ex); - assertTrue("" + ex, ex instanceof AssemblyStackTraceException); + assertNotNull(aste); - assertTrue(ex.getMessage(), ex.getMessage().contains("createObservable")); + assertTrue(aste.getMessage(), aste.getMessage().contains("createObservable")); RxJavaHooks.clearAssemblyTracking(); @@ -81,6 +81,12 @@ public void assemblyTrackingObservable() { createObservable().subscribe(ts); ts.assertError(TestException.class); + + ex = ts.getOnErrorEvents().get(0); + + aste = AssemblyStackTraceException.find(ex); + + assertNull(aste); } finally { RxJavaHooks.resetAssemblyTracking(); } @@ -103,15 +109,15 @@ public void assemblyTrackingSingle() { createSingle().subscribe(ts); - ts.assertError(AssemblyStackTraceException.class); + ts.assertError(TestException.class); Throwable ex = ts.getOnErrorEvents().get(0); - - assertTrue("" + ex, ex instanceof AssemblyStackTraceException); - - assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); - - assertTrue(ex.getMessage(), ex.getMessage().contains("createSingle")); + + AssemblyStackTraceException aste = AssemblyStackTraceException.find(ex); + + assertNotNull(aste); + + assertTrue(aste.getMessage(), aste.getMessage().contains("createSingle")); RxJavaHooks.clearAssemblyTracking(); @@ -120,6 +126,12 @@ public void assemblyTrackingSingle() { createSingle().subscribe(ts); ts.assertError(TestException.class); + + ex = ts.getOnErrorEvents().get(0); + + aste = AssemblyStackTraceException.find(ex); + + assertNull(aste); } finally { RxJavaHooks.resetAssemblyTracking(); } @@ -142,15 +154,15 @@ public void assemblyTrackingCompletable() { createCompletable().subscribe(ts); - ts.assertError(AssemblyStackTraceException.class); + ts.assertError(TestException.class); Throwable ex = ts.getOnErrorEvents().get(0); - - assertTrue("" + ex, ex instanceof AssemblyStackTraceException); - - assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); - - assertTrue(ex.getMessage(), ex.getMessage().contains("createCompletable")); + + AssemblyStackTraceException aste = AssemblyStackTraceException.find(ex); + + assertNotNull(aste); + + assertTrue(aste.getMessage(), aste.getMessage().contains("createCompletable")); RxJavaHooks.clearAssemblyTracking(); @@ -160,6 +172,12 @@ public void assemblyTrackingCompletable() { ts.assertError(TestException.class); + ex = ts.getOnErrorEvents().get(0); + + aste = AssemblyStackTraceException.find(ex); + + assertNull(aste); + } finally { RxJavaHooks.resetAssemblyTracking(); }