Skip to content

Exception thrown from onError() is swallowed when using groupBy/flatMap #2998

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
gdela opened this issue Jun 1, 2015 · 18 comments
Closed

Exception thrown from onError() is swallowed when using groupBy/flatMap #2998

gdela opened this issue Jun 1, 2015 · 18 comments
Labels
Milestone

Comments

@gdela
Copy link

gdela commented Jun 1, 2015

This issue is related to #969, where it has been said that following code should throw exception:

Observable.error(new Exception())
  .subscribe(new Observer<Object>() {
    public void onCompleted() {
      // noop
    }
    public void onError(Throwable e) {
      throw new IllegalStateException("This should crash the app");
    }
    public void onNext(Object o) {
      // noop
    }
  });

And it does throw exception, but when you add groupBy/flatMap like that:

Observable.error(new Exception())
  .groupBy(o -> o)
  .flatMap(g -> g)
  .subscribe(new Observer<Object>() {
    public void onCompleted() {
      // noop
    }
    public void onError(Throwable e) {
      throw new IllegalStateException("This should crash the app");
    }
    public void onNext(Object o) {
      // noop
    }
  });

Then the exception is silently swallowed. I tried to debug it a little bit, and seems that it could be because OnErrorFailedException is wrapped with CompositeException, so Exceptions.throwIfFatal() doesn't recognize it as fatal.

@akarnokd
Copy link
Member

akarnokd commented Jun 1, 2015

You shouldn't throw from onError() because that exception, usually, has nowhere to go. The RxJava operators are a bit inconsistent about this; some catch it, some let it through and others wrap it around.

@gdela
Copy link
Author

gdela commented Jun 2, 2015

I see, so I shouldn't use exception thrown from onError() to control flow, because it only seldom bubbles up to the subscribe() caller. I think that this wiki doc is a little bit misleading about that, because it says something like: "if the onError() call itself fails, the Observable [...] will throw a RuntimeException, an OnErrorFailedException, or an OnErrorNotImplementedException".

@gdela gdela closed this as completed Jun 2, 2015
@benjchristensen
Copy link
Member

If we are swallowing an error that's a bug.

@benjchristensen benjchristensen added this to the 1.0.x milestone Aug 28, 2015
@benjchristensen
Copy link
Member

It would be helpful it someone can create a failing unit test for this issue that demonstrates where the exception is being swallowed.

@konmik
Copy link

konmik commented Oct 7, 2015

@Test(expected = Exception.class)
public void throws_exception_from_onNext_onError() throws Exception {
    Observable
        .just(1)
        .doOnNext(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                throw new RuntimeException();
            }
        })
        .subscribe(
            new Action1<Integer>() {
                @Override
                public void call(Integer integer) {

                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    throw new RuntimeException();
                }
            });
}

@Test(expected = Exception.class)
public void throws_exception_from_switchMap_onError() throws Exception {
    Observable
        .just(1)
        .switchMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer integer) {
                return Observable.error(new RuntimeException());
            }
        })
        .subscribe(
            new Action1<Integer>() {
                @Override
                public void call(Integer integer) {

                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    throw new RuntimeException();
                }
            });
}

@Test(expected = Exception.class)
public void throws_exception_from_flatMap_onError() throws Exception {
    Observable
        .just(1)
        .flatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer integer) {
                return Observable.error(new RuntimeException());
            }
        })
        .subscribe(
            new Action1<Integer>() {
                @Override
                public void call(Integer integer) {

                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    throw new RuntimeException();
                }
            });
}

@Test(expected = Exception.class)
public void throws_exception_from_groupBy_onError() throws Exception {
    Observable
        .just(1)
        .groupBy(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                throw new RuntimeException();
            }
        })
        .subscribe(
            new Action1<GroupedObservable<Integer, Integer>>() {
                @Override
                public void call(GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {

                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    throw new RuntimeException();
                }
            });
}

@konmik
Copy link

konmik commented Oct 7, 2015

Please let me know if someone is going to fix this.

Sorry, I don't feel that I can fix this myself - concurrency is not my strongest skill.

@akarnokd
Copy link
Member

akarnokd commented Oct 7, 2015

This has nothing to do with concurrency but with fault tolerance. The problem is that if the call to onError throws, there is no clear path that could propagate the exception out of the operator chain. For example, if an onNext calls an onError which throws, the caller of onNext can't know if onNext itself failed or who else and may happily call a chain of onErrors which end up throwing again. Therefore, either we always propagate exceptions up the stack until the thread-root is reached or sink the exception of onError into some plugin handler.

@konmik
Copy link

konmik commented Oct 7, 2015

I think that a clean way to do this could be creation of RethrowAlwaysException which decorates the original exception and which should be rethrown on every check. It should be unwrapped at the top loop and rethrown again unwrapped out of reach of rx chain.

@akarnokd
Copy link
Member

akarnokd commented Oct 7, 2015

That works until the first async boundary where it will probably be swallowed by the ExecutorService.

@konmik
Copy link

konmik commented Oct 7, 2015

How does OnErrorNotImplementedException not get swallowed then? If I replace RuntimeException with OnErrorNotImplementedException it does not get swallowed.

@akarnokd
Copy link
Member

akarnokd commented Oct 7, 2015

Most exception blocks around onNext and callbacks use Exceptions.throwIfFatal which checks for OnErrorNotImplementedException. This could be extended to handle OnErrorThrewException wrapper indeed, but we'd have to check and wrap every call to onError into a try-catch to convert to this exception. The async boundary is still a problem.

@konmik
Copy link

konmik commented Oct 7, 2015

How about wrapping only subscriber's onError?

@akarnokd
Copy link
Member

akarnokd commented Oct 7, 2015

Could work. Could you post a PR with the changes and your unit tests to see if it really does?

@konmik
Copy link

konmik commented Oct 7, 2015

I can't build, I think there is something with the windows OS I'm running.

Caused by: org.eclipse.jgit.errors.RepositoryNotFoundException: repository not found: E:\RxJava-1.x

#2651

@akarnokd
Copy link
Member

akarnokd commented Oct 7, 2015

I'm developing RxJava under Windows with Eclipse Mars + Gradle plugin without any issues.

@JakeWharton
Copy link
Contributor

@konmik you need to clone the repo from git and not just download zip. The build plugins require that the project be build from inside of a git repo.

@konmik
Copy link

konmik commented Oct 7, 2015

Thanks, @JakeWharton.

It looks like we already have the required OnErrorFailedException #969

The first idea is to comment some lines out to allow RuntimeException to propagate.

    public static void throwIfFatal(Throwable t) {
        if (t instanceof OnErrorNotImplementedException) {
            throw (OnErrorNotImplementedException) t;
        } else if (t instanceof OnErrorFailedException) {
//            Throwable cause = t.getCause();
//            if (cause instanceof RuntimeException) {
//                throw (RuntimeException) cause;
//            } else {
                throw (OnErrorFailedException) t;
//            }
        }

While this does not solve the problem of the groupBy operator, this allows my other tests to pass.

I'll spend some time investigating the issue.

@akarnokd
Copy link
Member

akarnokd commented Nov 9, 2015

I guess this issue has been fixed in #3455 and delivered in 1.0.15.

@akarnokd akarnokd closed this as completed Nov 9, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants