diff --git a/rxjava-core/src/main/java/rx/exceptions/Exceptions.java b/rxjava-core/src/main/java/rx/exceptions/Exceptions.java index 5fccfb2a61..86679c61b2 100644 --- a/rxjava-core/src/main/java/rx/exceptions/Exceptions.java +++ b/rxjava-core/src/main/java/rx/exceptions/Exceptions.java @@ -44,6 +44,13 @@ public static RuntimeException propagate(Throwable t) { public static void throwIfFatal(Throwable t) { if (t instanceof OnErrorNotImplementedException) { throw (OnErrorNotImplementedException) t; + } else if (t instanceof OnErrorFailedException) { + Throwable cause = ((OnErrorFailedException) t).getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw (OnErrorFailedException) t; + } } // values here derived from https://github.com/Netflix/RxJava/issues/748#issuecomment-32471495 else if (t instanceof StackOverflowError) { diff --git a/rxjava-core/src/main/java/rx/exceptions/OnErrorFailedException.java b/rxjava-core/src/main/java/rx/exceptions/OnErrorFailedException.java new file mode 100644 index 0000000000..be75ac82af --- /dev/null +++ b/rxjava-core/src/main/java/rx/exceptions/OnErrorFailedException.java @@ -0,0 +1,35 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.exceptions; + +import rx.Subscriber; + +/** + * Used for re-throwing errors thrown from {@link Subscriber#onError(Throwable)}. + * + * https://github.com/Netflix/RxJava/issues/969 + */ +public class OnErrorFailedException extends RuntimeException { + private static final long serialVersionUID = -419289748403337611L; + + public OnErrorFailedException(String message, Throwable e) { + super(message, e); + } + + public OnErrorFailedException(Throwable e) { + super(e.getMessage(), e); + } +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java index ff121eeb77..1bdbcd8272 100644 --- a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java @@ -21,6 +21,7 @@ import rx.Subscriber; import rx.exceptions.CompositeException; import rx.exceptions.Exceptions; +import rx.exceptions.OnErrorFailedException; import rx.exceptions.OnErrorNotImplementedException; import rx.plugins.RxJavaPlugins; @@ -165,10 +166,10 @@ protected void _onError(Throwable e) { } catch (Throwable pluginException) { handlePluginException(pluginException); } - throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException))); + throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException))); } - throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); + throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); } } // if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch @@ -180,7 +181,7 @@ protected void _onError(Throwable e) { } catch (Throwable pluginException) { handlePluginException(pluginException); } - throw unsubscribeException; + throw new OnErrorFailedException(unsubscribeException); } } diff --git a/rxjava-core/src/test/java/rx/exceptions/ExceptionsTest.java b/rxjava-core/src/test/java/rx/exceptions/ExceptionsTest.java index 6880553736..75ce52c27e 100644 --- a/rxjava-core/src/test/java/rx/exceptions/ExceptionsTest.java +++ b/rxjava-core/src/test/java/rx/exceptions/ExceptionsTest.java @@ -15,6 +15,9 @@ */ package rx.exceptions; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import org.junit.Test; import rx.Observable; @@ -134,4 +137,34 @@ public void onNext(Integer t) { }); } + /** + * https://github.com/Netflix/RxJava/issues/969 + */ + @Test + public void testOnErrorExceptionIsThrown() { + try { + Observable.error(new IllegalArgumentException("original exception")).subscribe(new Observer() { + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + throw new IllegalStateException("This should be thrown"); + } + + @Override + public void onNext(Object o) { + + } + }); + fail("expecting an exception to be thrown"); + } catch (CompositeException t) { + CompositeException ce = (CompositeException) t; + assertTrue(ce.getExceptions().get(0) instanceof IllegalArgumentException); + assertTrue(ce.getExceptions().get(1) instanceof IllegalStateException); + } + } + } diff --git a/rxjava-core/src/test/java/rx/observers/SafeObserverTest.java b/rxjava-core/src/test/java/rx/observers/SafeObserverTest.java index 89b835ce00..f4316d4698 100644 --- a/rxjava-core/src/test/java/rx/observers/SafeObserverTest.java +++ b/rxjava-core/src/test/java/rx/observers/SafeObserverTest.java @@ -15,7 +15,11 @@ */ package rx.observers; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.atomic.AtomicReference; @@ -23,6 +27,7 @@ import rx.Subscriber; import rx.exceptions.CompositeException; +import rx.exceptions.OnErrorFailedException; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; import rx.subscriptions.Subscriptions; @@ -215,7 +220,8 @@ public void call() { assertEquals("failed", onError.get().getMessage()); // now assert the exception that was thrown - assertTrue(e instanceof SafeObserverTestException); + OnErrorFailedException onErrorFailedException = (OnErrorFailedException) e; + assertTrue(onErrorFailedException.getCause() instanceof SafeObserverTestException); assertEquals("failure from unsubscribe", e.getMessage()); } }