Skip to content

OnErrorFailedException fix #3455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,11 @@ public void call(Subscriber<? super R> o) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/rx/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,7 @@ public static void throwIfFatal(Throwable t) {
if (t instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) t;
} else if (t instanceof OnErrorFailedException) {
Throwable cause = t.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw (OnErrorFailedException) t;
}
throw (OnErrorFailedException) t;
}
// values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess by preventing the unwrapping, the error keeps propagating up, therefore, this change is essential. However, there could be code out there that depended on the original behavior (since Exceptions is part of the public API).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that there is such code. Such unwrapped exceptions were swallowed most of the time anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting bug. I agree that removing the unwrapping is needed.

else if (t instanceof StackOverflowError) {
Expand Down
72 changes: 68 additions & 4 deletions src/test/java/rx/exceptions/ExceptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;

public class ExceptionsTest {
Expand All @@ -45,7 +47,7 @@ public void call(Integer t1) {
public void testStackOverflowWouldOccur() {
final PublishSubject<Integer> a = PublishSubject.create();
final PublishSubject<Integer> b = PublishSubject.create();
final int MAX_STACK_DEPTH = 1000;
final int MAX_STACK_DEPTH = 800;
final AtomicInteger depth = new AtomicInteger();

a.subscribe(new Observer<Integer>() {
Expand Down Expand Up @@ -156,10 +158,72 @@ public void onNext(Object o) {
}
});
fail("expecting an exception to be thrown");
} catch (CompositeException t) {
assertTrue(t.getExceptions().get(0) instanceof IllegalArgumentException);
assertTrue(t.getExceptions().get(1) instanceof IllegalStateException);
} catch (OnErrorFailedException t) {
CompositeException cause = (CompositeException) t.getCause();
assertTrue(cause.getExceptions().get(0) instanceof IllegalArgumentException);
assertTrue(cause.getExceptions().get(1) instanceof IllegalStateException);
}
}

/**
* https://github.com/ReactiveX/RxJava/issues/2998
*/
@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromGroupBy() throws Exception {
Observable
.just(1)
.groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
throw new RuntimeException();
}
})
.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
throw new RuntimeException();
}

@Override
public void onNext(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {

}
});
}

/**
* https://github.com/ReactiveX/RxJava/issues/2998
*/
@Test(expected = OnErrorFailedException.class)
public void testOnErrorExceptionIsThrownFromOnNext() throws Exception {
Observable
.just(1)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
throw new RuntimeException();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
throw new RuntimeException();
}

@Override
public void onNext(Integer integer) {

}
});
}
}