Skip to content

Avoid swallowing errors in Completable #3733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,7 @@ public void onCompleted() {
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}

@Override
Expand Down Expand Up @@ -1864,14 +1865,17 @@ public void onCompleted() {
onComplete.call();
} catch (Throwable e) {
ERROR_HANDLER.handleError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
}
mad.unsubscribe();
}

@Override
public void onError(Throwable e) {
ERROR_HANDLER.handleError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}

@Override
Expand Down Expand Up @@ -1915,8 +1919,10 @@ public void onError(Throwable e) {
} catch (Throwable ex) {
e = new CompositeException(Arrays.asList(e, ex));
ERROR_HANDLER.handleError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
}
mad.unsubscribe();
}

@Override
Expand All @@ -1927,7 +1933,12 @@ public void onSubscribe(Subscription d) {

return mad;
}


private static void deliverUncaughtException(Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's a good candidate to live in Exceptions 😸

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require it to be public, sadly, and this isn't an API that RxJava needs to provide for users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we can create ExceptionsInternal under internal package, but yeah… k, let's extract it if somewhere else same functionality will be needed.

Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, e);
}

/**
* Subscribes the given CompletableSubscriber to this Completable instance.
* @param s the CompletableSubscriber, not null
Expand Down
16 changes: 16 additions & 0 deletions src/test/java/rx/CapturingUncaughtExceptionHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package rx;

import java.util.concurrent.CountDownLatch;

public final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public int count = 0;
public Throwable caught;
public CountDownLatch completed = new CountDownLatch(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
completed.countDown();
}
}
79 changes: 78 additions & 1 deletion src/test/java/rx/CompletableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2700,7 +2700,64 @@ public void call(CompletableSubscriber s) {

Assert.assertTrue(name.get().startsWith("RxComputation"));
}


@Test
public void subscribeEmptyOnError() {
expectUncaughtTestException(new Action0() {
@Override public void call() {
error.completable.subscribe();
}
});
}

@Test
public void subscribeOneActionOnError() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
error.completable.subscribe(new Action0() {
@Override
public void call() {
}
});
}
});
}

@Test
public void subscribeOneActionThrowFromOnCompleted() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
normal.completable.subscribe(new Action0() {
@Override
public void call() {
throw new TestException();
}
});
}
});
}

@Test
public void subscribeTwoActionsThrowFromOnError() {
expectUncaughtTestException(new Action0() {
@Override
public void call() {
error.completable.subscribe(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throw new TestException();
}
}, new Action0() {
@Override
public void call() {
}
});
}
});
}

@Test(timeout = 1000)
public void timeoutEmitError() {
Throwable e = Completable.never().timeout(100, TimeUnit.MILLISECONDS).get();
Expand Down Expand Up @@ -3742,4 +3799,24 @@ public void call(Throwable e) {
assertNotNull("Unsubscribed before the call to onError", subscriptionRef.get());
}

private static void expectUncaughtTestException(Action0 action) {
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
Thread.setDefaultUncaughtExceptionHandler(handler);
try {
action.call();
assertEquals("Should have received exactly 1 exception", 1, handler.count);
Throwable caught = handler.caught;
while (caught != null) {
if (caught instanceof TestException) break;
if (caught == caught.getCause()) break;
caught = caught.getCause();
}
assertTrue("A TestException should have been delivered to the handler",
caught instanceof TestException);
} finally {
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
}
}

}
14 changes: 1 addition & 13 deletions src/test/java/rx/schedulers/SchedulerTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package rx.schedulers;

import rx.CapturingUncaughtExceptionHandler;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
Expand Down Expand Up @@ -87,19 +88,6 @@ static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) t
}
}

private static final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
int count = 0;
Throwable caught;
CountDownLatch completed = new CountDownLatch(1);

@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
completed.countDown();
}
}

private static final class CapturingObserver<T> implements Observer<T> {
CountDownLatch completed = new CountDownLatch(1);
int errorCount = 0;
Expand Down