From 5fee6669807ea67acaab40bdb198544a9f246344 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 19 Jul 2016 15:05:02 +0200 Subject: [PATCH 1/2] 1.x: fix assembly tracking replacing original exception --- src/main/java/rx/Observable.java | 1 + src/main/java/rx/Scheduler.java | 6 +- .../AssemblyStackTraceException.java | 57 +++++++++++++++---- .../operators/OnSubscribeOnAssembly.java | 2 +- .../OnSubscribeOnAssemblyCompletable.java | 2 +- .../OnSubscribeOnAssemblySingle.java | 2 +- src/main/java/rx/plugins/RxJavaHooks.java | 2 +- src/test/java/rx/plugins/RxJavaHooksTest.java | 54 ++++++++++++------ 8 files changed, 92 insertions(+), 34 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7114a42e76..4cbf28d18e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -1712,6 +1712,7 @@ public static Observable from(T[] array) { * You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The * rest of its methods are threadsafe. * + * @param the element type * @param asyncEmitter the emitter that is called when a Subscriber subscribes to the returned {@code Observable} * @param backpressure the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough * @return the new Observable instance diff --git a/src/main/java/rx/Scheduler.java b/src/main/java/rx/Scheduler.java index 756963e66f..1487373fcc 100644 --- a/src/main/java/rx/Scheduler.java +++ b/src/main/java/rx/Scheduler.java @@ -253,8 +253,10 @@ public long now() { * }); * * - * @param combine - * @return + * @param a Scheduler and a Subscription + * @param combine the function that takes a two-level nested Observable sequence of a Completable and returns + * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions. + * @return the Scheduler with the customized execution behavior */ @SuppressWarnings("unchecked") @Experimental diff --git a/src/main/java/rx/exceptions/AssemblyStackTraceException.java b/src/main/java/rx/exceptions/AssemblyStackTraceException.java index 752e8aad87..4d73231649 100644 --- a/src/main/java/rx/exceptions/AssemblyStackTraceException.java +++ b/src/main/java/rx/exceptions/AssemblyStackTraceException.java @@ -15,7 +15,10 @@ */ package rx.exceptions; +import java.util.*; + import rx.annotations.Experimental; +import rx.plugins.RxJavaHooks; /** * A RuntimeException that is stackless but holds onto a textual @@ -27,16 +30,6 @@ public final class AssemblyStackTraceException extends RuntimeException { /** */ private static final long serialVersionUID = 2038859767182585852L; - /** - * Constructs an AssemblyStackTraceException with the given message and - * a cause. - * @param message the message - * @param cause the cause - */ - public AssemblyStackTraceException(String message, Throwable cause) { - super(message, cause); - } - /** * Constructs an AssemblyStackTraceException with the given message. * @param message the message @@ -49,4 +42,48 @@ public AssemblyStackTraceException(String message) { public synchronized Throwable fillInStackTrace() { // NOPMD return this; } + + /** + * Finds an empty cause slot and assigns itself to it. + * @param exception the exception to start from + */ + public void attach(Throwable exception) { + Set memory = new HashSet(); + + for (;;) { + if (exception.getCause() == null) { + exception.initCause(this); + return; + } + + exception = exception.getCause(); + if (!memory.add(exception)) { + // in case we run into a cycle, give up and report this to the hooks + RxJavaHooks.onError(this); + return; + } + } + } + + /** + * Locate the first AssemblyStackTraceException in the causal chain of the + * given Throwable (or it if it's one). + * @param e the input throwable + * @return the AssemblyStackTraceException located or null if not found + */ + public static AssemblyStackTraceException find(Throwable e) { + Set memory = new HashSet(); + for (;;) { + if (e instanceof AssemblyStackTraceException) { + return (AssemblyStackTraceException)e; + } + if (e == null || e.getCause() == null) { + return null; + } + e = e.getCause(); + if (!memory.add(e)) { + return null; + } + } + } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java index 11c7bcff0e..a673eaec95 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java @@ -115,7 +115,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - e = new AssemblyStackTraceException(stacktrace, e); + new AssemblyStackTraceException(stacktrace).attach(e); actual.onError(e); } diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java index da5a144045..e10e0270f4 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java @@ -71,7 +71,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - e = new AssemblyStackTraceException(stacktrace, e); + new AssemblyStackTraceException(stacktrace).attach(e); actual.onError(e); } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java index aeed623a7b..14008514d4 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java @@ -61,7 +61,7 @@ public OnAssemblySingleSubscriber(SingleSubscriber actual, String sta @Override public void onError(Throwable e) { - e = new AssemblyStackTraceException(stacktrace, e); + new AssemblyStackTraceException(stacktrace).attach(e); actual.onError(e); } diff --git a/src/main/java/rx/plugins/RxJavaHooks.java b/src/main/java/rx/plugins/RxJavaHooks.java index cef9a8f6fb..75602e7c2b 100644 --- a/src/main/java/rx/plugins/RxJavaHooks.java +++ b/src/main/java/rx/plugins/RxJavaHooks.java @@ -974,7 +974,7 @@ public static Func1 getOnSingleLift() { *

* Calling with a {@code null} parameter restores the default behavior: * the hook returns the same object. - * @param onObservableLift the function that is called with original Operator and should + * @param onCompletableLift the function that is called with original Operator and should * return an Operator instance. */ public static void setOnCompletableLift(Func1 onCompletableLift) { diff --git a/src/test/java/rx/plugins/RxJavaHooksTest.java b/src/test/java/rx/plugins/RxJavaHooksTest.java index 9209228124..a1df503815 100644 --- a/src/test/java/rx/plugins/RxJavaHooksTest.java +++ b/src/test/java/rx/plugins/RxJavaHooksTest.java @@ -64,15 +64,15 @@ public void assemblyTrackingObservable() { createObservable().subscribe(ts); - ts.assertError(AssemblyStackTraceException.class); + ts.assertError(TestException.class); Throwable ex = ts.getOnErrorEvents().get(0); - assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); + AssemblyStackTraceException aste = AssemblyStackTraceException.find(ex); - assertTrue("" + ex, ex instanceof AssemblyStackTraceException); + assertNotNull(aste); - assertTrue(ex.getMessage(), ex.getMessage().contains("createObservable")); + assertTrue(aste.getMessage(), aste.getMessage().contains("createObservable")); RxJavaHooks.clearAssemblyTracking(); @@ -81,6 +81,12 @@ public void assemblyTrackingObservable() { createObservable().subscribe(ts); ts.assertError(TestException.class); + + ex = ts.getOnErrorEvents().get(0); + + aste = AssemblyStackTraceException.find(ex); + + assertNull(aste); } finally { RxJavaHooks.resetAssemblyTracking(); } @@ -103,15 +109,15 @@ public void assemblyTrackingSingle() { createSingle().subscribe(ts); - ts.assertError(AssemblyStackTraceException.class); + ts.assertError(TestException.class); Throwable ex = ts.getOnErrorEvents().get(0); - - assertTrue("" + ex, ex instanceof AssemblyStackTraceException); - - assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); - - assertTrue(ex.getMessage(), ex.getMessage().contains("createSingle")); + + AssemblyStackTraceException aste = AssemblyStackTraceException.find(ex); + + assertNotNull(aste); + + assertTrue(aste.getMessage(), aste.getMessage().contains("createSingle")); RxJavaHooks.clearAssemblyTracking(); @@ -120,6 +126,12 @@ public void assemblyTrackingSingle() { createSingle().subscribe(ts); ts.assertError(TestException.class); + + ex = ts.getOnErrorEvents().get(0); + + aste = AssemblyStackTraceException.find(ex); + + assertNull(aste); } finally { RxJavaHooks.resetAssemblyTracking(); } @@ -142,15 +154,15 @@ public void assemblyTrackingCompletable() { createCompletable().subscribe(ts); - ts.assertError(AssemblyStackTraceException.class); + ts.assertError(TestException.class); Throwable ex = ts.getOnErrorEvents().get(0); - - assertTrue("" + ex, ex instanceof AssemblyStackTraceException); - - assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); - - assertTrue(ex.getMessage(), ex.getMessage().contains("createCompletable")); + + AssemblyStackTraceException aste = AssemblyStackTraceException.find(ex); + + assertNotNull(aste); + + assertTrue(aste.getMessage(), aste.getMessage().contains("createCompletable")); RxJavaHooks.clearAssemblyTracking(); @@ -160,6 +172,12 @@ public void assemblyTrackingCompletable() { ts.assertError(TestException.class); + ex = ts.getOnErrorEvents().get(0); + + aste = AssemblyStackTraceException.find(ex); + + assertNull(aste); + } finally { RxJavaHooks.resetAssemblyTracking(); } From 9105b33424cd33f551166bc3b4272fb319ad2ed1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 20 Jul 2016 23:02:13 +0200 Subject: [PATCH 2/2] Rename attach() to attachTo() --- src/main/java/rx/exceptions/AssemblyStackTraceException.java | 2 +- src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java | 2 +- .../rx/internal/operators/OnSubscribeOnAssemblyCompletable.java | 2 +- .../java/rx/internal/operators/OnSubscribeOnAssemblySingle.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/rx/exceptions/AssemblyStackTraceException.java b/src/main/java/rx/exceptions/AssemblyStackTraceException.java index 4d73231649..110fc803c8 100644 --- a/src/main/java/rx/exceptions/AssemblyStackTraceException.java +++ b/src/main/java/rx/exceptions/AssemblyStackTraceException.java @@ -47,7 +47,7 @@ public synchronized Throwable fillInStackTrace() { // NOPMD * Finds an empty cause slot and assigns itself to it. * @param exception the exception to start from */ - public void attach(Throwable exception) { + public void attachTo(Throwable exception) { Set memory = new HashSet(); for (;;) { diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java index a673eaec95..f26dda29dc 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java @@ -115,7 +115,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - new AssemblyStackTraceException(stacktrace).attach(e); + new AssemblyStackTraceException(stacktrace).attachTo(e); actual.onError(e); } diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java index e10e0270f4..c8d7eaf52f 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java @@ -71,7 +71,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - new AssemblyStackTraceException(stacktrace).attach(e); + new AssemblyStackTraceException(stacktrace).attachTo(e); actual.onError(e); } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java index 14008514d4..be5ce6cb5f 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java @@ -61,7 +61,7 @@ public OnAssemblySingleSubscriber(SingleSubscriber actual, String sta @Override public void onError(Throwable e) { - new AssemblyStackTraceException(stacktrace).attach(e); + new AssemblyStackTraceException(stacktrace).attachTo(e); actual.onError(e); }