From 05d8c63b9771c20279b9ee36357e0e300fe12b90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 25 Jan 2016 22:17:43 +0100 Subject: [PATCH] 1.x: fix SyncOnSubscribe not signalling onError if the generator crashes --- .../java/rx/observables/SyncOnSubscribe.java | 19 +++++++++++++-- .../rx/observables/SyncOnSubscribeTest.java | 23 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/main/java/rx/observables/SyncOnSubscribe.java b/src/main/java/rx/observables/SyncOnSubscribe.java index 707e047b2a..f8cda8dde0 100644 --- a/src/main/java/rx/observables/SyncOnSubscribe.java +++ b/src/main/java/rx/observables/SyncOnSubscribe.java @@ -24,6 +24,7 @@ import rx.Subscriber; import rx.Subscription; import rx.annotations.Experimental; +import rx.exceptions.Exceptions; import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Action2; @@ -53,7 +54,16 @@ public abstract class SyncOnSubscribe implements OnSubscribe { */ @Override public final void call(final Subscriber subscriber) { - S state = generateState(); + S state; + + try { + state = generateState(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + subscriber.onError(e); + return; + } + SubscriptionProducer p = new SubscriptionProducer(subscriber, this, state); subscriber.add(p); subscriber.setProducer(p); @@ -363,7 +373,12 @@ private boolean tryUnsubscribe() { } private void doUnsubscribe() { - parent.onUnsubscribe(state); + try { + parent.onUnsubscribe(state); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + } } @Override diff --git a/src/test/java/rx/observables/SyncOnSubscribeTest.java b/src/test/java/rx/observables/SyncOnSubscribeTest.java index 82cfc0b033..71fc0ac8e9 100644 --- a/src/test/java/rx/observables/SyncOnSubscribeTest.java +++ b/src/test/java/rx/observables/SyncOnSubscribeTest.java @@ -989,4 +989,27 @@ public Object call() throws Exception { if (exec != null) exec.shutdownNow(); } } + + @Test + public void testStateThrows() { + TestSubscriber ts = new TestSubscriber(); + + SyncOnSubscribe.createSingleState( + new Func0() { + @Override + public Object call() { + throw new TestException(); + } + } + , new Action2>() { + @Override + public void call(Object s, Observer o) { + + } + }).call(ts); + + ts.assertNoValues(); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } }