Description
Hi there,
thanks for providing reactiveX for java!
I ran into an issue with RxJava 2.2.0 when I tried to run a Callable to get an Observable/Single and disposed that.
The issue was "kinda" already discussed here: #4880 / #4991 - the new tryOnError-Method is great and in my eyes really makes the whole thing more usable than having some Exception on some global handler "out of nowhere".
Before I come to the core issue, a short remark: I do understand that this is some kind of intended by design - but I think, maybe there could be tweaks to not get bitten by that design... ;-) It does make sense, that a disposed subscriber doesn't get an Exception, but on first glance, it's not intuitive, if that Exception happens right when I'm disposing something...
What I'm doing was the following:
I have a Task-Class implementing Callable, which represents a long(er) running computation. I was quite happy about Observable.fromCallable() (or Single.fromCallable respectively).
But it blew up, when I disposed the created Observable, as the Task uses an Exception if it is canceled.
(In my opinion, this is totally fine, as it's similar to Java's InterruptedException and Callable allows for checkedExceptions.)
Here's the code showing the relevant part. The task checks on each iteration for Thread.interrupted(), and if it is, it throws an Exception. The manual disposing is for testing only, a real usecase would be for example a switchMap, to restart some processing if new data arrived before the previous calculation was finished.
Disposable disposable = Single.fromCallable(task)
.subscribeOn(Schedulers.computation())
.subscribe(s -> System.out.println("Observed: " + s),
throwable -> System.out.println("Observed Exception: " + throwable));
TimeUnit.MILLISECONDS.sleep(50);
disposable.dispose();
But this leads to an UndeliverableException thrown, which is more than unintended. :-(
Is there any way around that? (Or at least any way to catch that?) I didn't find any, and the referenced issues above didn't show any, too - with one exception:
Using the new tryOnError:
Disposable disposable = Single.create(emitter -> {
// emitter.setCancellable(() -> task.requestCancel = true); // edit: not necessary, forgot to remove
try {
emitter.onSuccess(task.call());
} catch (Throwable t) {
emitter.tryOnError(t);
}
}).subscribeOn(Schedulers.computation())
.subscribe(s -> System.out.println("Observed: " + s),
throwable -> System.out.println("Observed Exception: " + throwable));
TimeUnit.MILLISECONDS.sleep(50);
disposable.dispose();
The task stops it's processing and the single returns. But comparing this clunky verbose call to the fromCallable above... uugh!
So maybe the current state isn't the prettiest one for that special case, but I'd like to suggest some ideas for discussion and (further) improvement:
- fromCallable(Callable) could be extended to an optioal second argument, indicating that tryOnError should be used or providing a custom ExceptionHandler?
- maybe it would be nice (not only for this usecase) to have some possibility to inject a custom ExceptionHandler, a little like subscribeOn, etc. - maybe handleExceptionsAfterDisposed(Consumer)?
- at least - as it may not be uncommon for callables to throw Exceptions - document the post-disposed-behaviour a little more?
Final Bonus-Question: If I'd create a new library-function for some long-running algorithm... what design would you suggest? In my opinion, the implementation of Callable is the most generic way - more than implementing SingleOnSubscribe maybe...? (And at least more generally usable).
Thanks for any feedback or maybe some hints for improvements!