Skip to content

Commit 5e78f01

Browse files
committed
catch onCompleted unsubscribe error and report to RxJavaPlugin error handler
1 parent adfabec commit 5e78f01

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

src/main/java/rx/observers/SafeSubscriber.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,16 @@ public void onCompleted() {
8686
// handle errors if the onCompleted implementation fails, not just if the Observable fails
8787
_onError(e);
8888
} finally {
89-
// auto-unsubscribe
90-
unsubscribe();
89+
try {
90+
// auto-unsubscribe
91+
unsubscribe();
92+
} catch (Throwable e2) {
93+
try {
94+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
95+
} catch (Throwable pluginException) {
96+
handlePluginException(pluginException);
97+
}
98+
}
9199
}
92100
}
93101
}

src/test/java/rx/observers/SafeSubscriberTest.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,23 @@
1515
*/
1616
package rx.observers;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.junit.Assert.assertTrue;
1920

2021
import java.lang.reflect.Method;
22+
import java.util.concurrent.atomic.AtomicInteger;
2123

22-
import org.junit.*;
24+
import org.junit.After;
25+
import org.junit.Assert;
26+
import org.junit.Before;
27+
import org.junit.Test;
2328

24-
import rx.exceptions.*;
29+
import rx.exceptions.OnErrorFailedException;
30+
import rx.exceptions.OnErrorNotImplementedException;
31+
import rx.exceptions.TestException;
2532
import rx.functions.Action0;
26-
import rx.plugins.*;
33+
import rx.plugins.RxJavaErrorHandler;
34+
import rx.plugins.RxJavaPlugins;
2735
import rx.subscriptions.Subscriptions;
2836

2937
public class SafeSubscriberTest {
@@ -227,4 +235,34 @@ public void call() {
227235

228236
safe.onError(new TestException());
229237
}
238+
239+
@Test
240+
public void testPluginErrorHandlerReceivesExceptionWhenUnsubscribeAfterCompletionThrows() {
241+
final AtomicInteger calls = new AtomicInteger();
242+
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
243+
@Override
244+
public void handleError(Throwable e) {
245+
calls.incrementAndGet();
246+
}
247+
});
248+
249+
final AtomicInteger errors = new AtomicInteger();
250+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
251+
@Override
252+
public void onError(Throwable e) {
253+
errors.incrementAndGet();
254+
}
255+
};
256+
SafeSubscriber<Integer> safe = new SafeSubscriber<Integer>(ts);
257+
safe.add(Subscriptions.create(new Action0() {
258+
@Override
259+
public void call() {
260+
throw new RuntimeException();
261+
}
262+
}));
263+
264+
safe.onCompleted();
265+
assertEquals(1, (int) calls.get());
266+
assertEquals(0, (int) errors.get());
267+
}
230268
}

0 commit comments

Comments
 (0)