From d67794db730f9467bb67dd8e69b535340e7aa89d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 16 Feb 2017 22:48:17 +0100 Subject: [PATCH] 2.x: fix doOnSubscribe signalling Undeliv.Exception instead of onError --- .../observers/DisposableLambdaObserver.java | 13 +- .../completable/CompletablePeek.java | 142 ++++++++++-------- .../flowable/FlowableDoOnLifecycle.java | 12 +- .../completable/CompletableDoOnTest.java | 33 ++++ .../flowable/FlowableDoOnLifecycleTest.java | 35 ++++- .../operators/maybe/MaybeDoOnEventTest.java | 39 +++++ .../ObservableDoOnSubscribeTest.java | 35 ++++- .../operators/single/SingleDoOnTest.java | 33 +++- 8 files changed, 272 insertions(+), 70 deletions(-) diff --git a/src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java b/src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java index 49e720717c..3ba9ebfa68 100644 --- a/src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java +++ b/src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java @@ -43,8 +43,7 @@ public void onSubscribe(Disposable s) { } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); - RxJavaPlugins.onError(e); - + this.s = DisposableHelper.DISPOSED; EmptyDisposable.error(e, actual); return; } @@ -61,12 +60,18 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - actual.onError(t); + if (s != DisposableHelper.DISPOSED) { + actual.onError(t); + } else { + RxJavaPlugins.onError(t); + } } @Override public void onComplete() { - actual.onComplete(); + if (s != DisposableHelper.DISPOSED) { + actual.onComplete(); + } } diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java b/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java index a067288d09..6c62558865 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletablePeek.java @@ -14,10 +14,10 @@ package io.reactivex.internal.operators.completable; import io.reactivex.*; -import io.reactivex.disposables.*; +import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.plugins.RxJavaPlugins; public final class CompletablePeek extends Completable { @@ -48,77 +48,99 @@ public CompletablePeek(CompletableSource source, Consumer on @Override protected void subscribeActual(final CompletableObserver s) { - source.subscribe(new CompletableObserver() { + source.subscribe(new CompletableObserverImplementation(s)); + } - @Override - public void onComplete() { - try { - onComplete.run(); - onTerminate.run(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.onError(e); - return; - } + final class CompletableObserverImplementation implements CompletableObserver, Disposable { - s.onComplete(); + final CompletableObserver actual; - doAfter(); - } + Disposable d; - @Override - public void onError(Throwable e) { - try { - onError.accept(e); - onTerminate.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - e = new CompositeException(e, ex); - } + private CompletableObserverImplementation(CompletableObserver actual) { + this.actual = actual; + } - s.onError(e); - doAfter(); + @Override + public void onSubscribe(final Disposable d) { + try { + onSubscribe.accept(d); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.dispose(); + this.d = DisposableHelper.DISPOSED; + EmptyDisposable.error(ex, actual); + return; + } + if (DisposableHelper.validate(this.d, d)) { + this.d = d; + actual.onSubscribe(this); } + } - @Override - public void onSubscribe(final Disposable d) { - - try { - onSubscribe.accept(d); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - d.dispose(); - EmptyDisposable.error(ex, s); - return; - } - - s.onSubscribe(Disposables.fromRunnable(new Runnable() { - @Override - public void run() { - try { - onDispose.run(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); - } - d.dispose(); - } - })); + @Override + public void onError(Throwable e) { + if (d == DisposableHelper.DISPOSED) { + RxJavaPlugins.onError(e); + return; + } + try { + onError.accept(e); + onTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + e = new CompositeException(e, ex); } - void doAfter() { + actual.onError(e); - try { - onAfterTerminate.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); - } + doAfter(); + } + @Override + public void onComplete() { + if (d == DisposableHelper.DISPOSED) { + return; } - }); - } + try { + onComplete.run(); + onTerminate.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + actual.onError(e); + return; + } + + actual.onComplete(); + + doAfter(); + } + + void doAfter() { + try { + onAfterTerminate.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + @Override + public void dispose() { + try { + onDispose.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); + } + d.dispose(); + } + @Override + public boolean isDisposed() { + return d.isDisposed(); + } + } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java index 7a536ce3e3..5d43648839 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java @@ -64,7 +64,7 @@ public void onSubscribe(Subscription s) { } catch (Throwable e) { Exceptions.throwIfFatal(e); s.cancel(); - RxJavaPlugins.onError(e); + this.s = SubscriptionHelper.CANCELLED; EmptySubscription.error(e, actual); return; } @@ -81,12 +81,18 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - actual.onError(t); + if (s != SubscriptionHelper.CANCELLED) { + actual.onError(t); + } else { + RxJavaPlugins.onError(t); + } } @Override public void onComplete() { - actual.onComplete(); + if (s != SubscriptionHelper.CANCELLED) { + actual.onComplete(); + } } @Override diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableDoOnTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableDoOnTest.java index 6b1a2cfb56..b2082c96d5 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableDoOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableDoOnTest.java @@ -20,9 +20,11 @@ import org.junit.*; import io.reactivex.*; +import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; public class CompletableDoOnTest { @@ -74,4 +76,35 @@ public void run() throws Exception { assertTrue(atomicBoolean.get()); } + + @Test + public void onSubscribeCrash() { + List errors = TestHelper.trackPluginErrors(); + try { + final Disposable bs = Disposables.empty(); + + new Completable() { + @Override + protected void subscribeActual(CompletableObserver s) { + s.onSubscribe(bs); + s.onError(new TestException("Second")); + s.onComplete(); + } + } + .doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + throw new TestException("First"); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(bs.isDisposed()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java index e4ce9d729f..af309b0e9b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java @@ -13,9 +13,10 @@ package io.reactivex.internal.operators.flowable; +import static org.junit.Assert.*; + import java.util.List; -import static org.junit.Assert.*; import org.junit.Test; import org.reactivestreams.*; @@ -23,6 +24,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; public class FlowableDoOnLifecycleTest { @@ -132,4 +134,35 @@ public void run() throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void onSubscribeCrash() { + List errors = TestHelper.trackPluginErrors(); + try { + final BooleanSubscription bs = new BooleanSubscription(); + + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(bs); + s.onError(new TestException("Second")); + s.onComplete(); + } + } + .doOnSubscribe(new Consumer() { + @Override + public void accept(Subscription s) throws Exception { + throw new TestException("First"); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(bs.isCancelled()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java index e09e5fb85b..6df94cbd82 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeDoOnEventTest.java @@ -13,10 +13,17 @@ package io.reactivex.internal.operators.maybe; +import static org.junit.Assert.assertTrue; + +import java.util.List; + import org.junit.Test; import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.PublishSubject; public class MaybeDoOnEventTest { @@ -45,4 +52,36 @@ public void accept(Integer v, Throwable e) throws Exception { } }); } + + @Test + public void onSubscribeCrash() { + List errors = TestHelper.trackPluginErrors(); + try { + final Disposable bs = Disposables.empty(); + + new Maybe() { + @Override + protected void subscribeActual(MaybeObserver s) { + s.onSubscribe(bs); + s.onError(new TestException("Second")); + s.onComplete(); + s.onSuccess(1); + } + } + .doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + throw new TestException("First"); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(bs.isDisposed()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnSubscribeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnSubscribeTest.java index 6acbe70fd6..dbd4ca88eb 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnSubscribeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnSubscribeTest.java @@ -13,15 +13,18 @@ package io.reactivex.internal.operators.observable; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; +import java.util.List; import java.util.concurrent.atomic.*; import org.junit.Test; import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.exceptions.TestException; import io.reactivex.functions.Consumer; +import io.reactivex.plugins.RxJavaPlugins; public class ObservableDoOnSubscribeTest { @@ -103,4 +106,34 @@ public void accept(Disposable s) { assertEquals(6, countAfter.get()); } + @Test + public void onSubscribeCrash() { + List errors = TestHelper.trackPluginErrors(); + try { + final Disposable bs = Disposables.empty(); + + new Observable() { + @Override + protected void subscribeActual(Observer s) { + s.onSubscribe(bs); + s.onError(new TestException("Second")); + s.onComplete(); + } + } + .doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + throw new TestException("First"); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(bs.isDisposed()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTest.java index 87aa548a25..853b21c320 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDoOnTest.java @@ -20,7 +20,7 @@ import org.junit.Test; import io.reactivex.*; -import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; @@ -327,4 +327,35 @@ public void accept(Integer v) throws Exception { .test() .assertFailure(TestException.class); } + + @Test + public void onSubscribeCrash() { + List errors = TestHelper.trackPluginErrors(); + try { + final Disposable bs = Disposables.empty(); + + new Single() { + @Override + protected void subscribeActual(SingleObserver s) { + s.onSubscribe(bs); + s.onError(new TestException("Second")); + s.onSuccess(1); + } + } + .doOnSubscribe(new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + throw new TestException("First"); + } + }) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(bs.isDisposed()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } }