From fcb201a2842019451bc165e6562ca818c93129d2 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 14 May 2016 10:08:22 +0200 Subject: [PATCH 1/8] 1.x: add Completable.safeSubscribe option --- src/main/java/rx/Completable.java | 64 ++++++++ .../observers/SafeCompletableSubscriber.java | 90 ++++++++++++ src/test/java/rx/CompletableTest.java | 137 ++++++++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 src/main/java/rx/observers/SafeCompletableSubscriber.java diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index 8a2a4121d2..e2a7f64298 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -26,6 +26,7 @@ import rx.functions.*; import rx.internal.operators.*; import rx.internal.util.*; +import rx.observers.SafeCompletableSubscriber; import rx.plugins.*; import rx.schedulers.Schedulers; import rx.subscriptions.*; @@ -1971,8 +1972,29 @@ public final void subscribe(CompletableSubscriber s) { } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); ERROR_HANDLER.handleError(ex); + throw toNpe(ex); + } + } + + /** + * Subscribes the given CompletableSubscriber to this Completable instance + * and handles exceptions throw by its onXXX methods. + * @param s the CompletableSubscriber, not null + * @throws NullPointerException if s is null + */ + public final void safeSubscribe(CompletableSubscriber s) { + requireNonNull(s); + try { + // TODO plugin wrapping the subscriber + + onSubscribe.call(new SafeCompletableSubscriber(s)); + } catch (NullPointerException ex) { + throw ex; + } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + ERROR_HANDLER.handleError(ex); throw toNpe(ex); } } @@ -2012,6 +2034,48 @@ public void onSubscribe(Subscription d) { } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + ERROR_HANDLER.handleError(ex); + throw toNpe(ex); + } + } + + /** + * Subscribes a reactive-streams Subscriber to this Completable instance which + * will receive only an onError or onComplete event. + * @param s the reactive-streams Subscriber, not null + * @throws NullPointerException if s is null + */ + public final void safeSubscribe(Subscriber s) { + requireNonNull(s); + try { + final Subscriber sw = s; // FIXME hooking in 1.x is kind of strange to me + + if (sw == null) { + throw new NullPointerException("The RxJavaPlugins.onSubscribe returned a null Subscriber"); + } + + subscribe(new SafeCompletableSubscriber(new CompletableSubscriber() { + @Override + public void onCompleted() { + sw.onCompleted(); + } + + @Override + public void onError(Throwable e) { + sw.onError(e); + } + + @Override + public void onSubscribe(Subscription d) { + sw.add(d); + } + })); + + } catch (NullPointerException ex) { + throw ex; + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); ERROR_HANDLER.handleError(ex); throw toNpe(ex); } diff --git a/src/main/java/rx/observers/SafeCompletableSubscriber.java b/src/main/java/rx/observers/SafeCompletableSubscriber.java new file mode 100644 index 0000000000..3e85300ef4 --- /dev/null +++ b/src/main/java/rx/observers/SafeCompletableSubscriber.java @@ -0,0 +1,90 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observers; + +import rx.Completable.CompletableSubscriber; +import rx.exceptions.*; +import rx.Subscription; +import rx.internal.util.RxJavaPluginUtils; + +/** + * Wraps another CompletableSubscriber and handles exceptions thrown + * from onError and onCompleted. + */ +public final class SafeCompletableSubscriber implements CompletableSubscriber, Subscription { + final CompletableSubscriber actual; + + Subscription s; + + boolean done; + + public SafeCompletableSubscriber(CompletableSubscriber actual) { + this.actual = actual; + } + + @Override + public void onCompleted() { + if (done) { + return; + } + done = true; + try { + actual.onCompleted(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + throw new OnCompletedFailedException(ex); + } + } + + @Override + public void onError(Throwable e) { + if (done) { + RxJavaPluginUtils.handleException(e); + return; + } + done = true; + try { + actual.onError(e); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + + throw new OnErrorFailedException(new CompositeException(e, ex)); + } + } + + @Override + public void onSubscribe(Subscription d) { + this.s = d; + try { + actual.onSubscribe(this); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + d.unsubscribe(); + onError(ex); + } + } + + @Override + public void unsubscribe() { + s.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return done || s.isUnsubscribed(); + } +} diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index 8ee2f747d3..d68bea3015 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -3898,4 +3898,141 @@ private static void expectUncaughtTestException(Action0 action) { } } + @Test + public void safeOnCompleteThrows() { + try { + normal.completable.safeSubscribe(new CompletableSubscriber() { + + @Override + public void onCompleted() { + throw new TestException("Forced failure"); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onSubscribe(Subscription d) { + + } + + }); + Assert.fail("Did not propagate exception!"); + } catch (OnCompletedFailedException ex) { + Throwable c = ex.getCause(); + Assert.assertNotNull(c); + + Assert.assertEquals("Forced failure", c.getMessage()); + } + } + + @Test + public void safeOnCompleteThrowsRegularSubscriber() { + try { + normal.completable.safeSubscribe(new Subscriber() { + + @Override + public void onCompleted() { + throw new TestException("Forced failure"); + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Object t) { + + } + }); + Assert.fail("Did not propagate exception!"); + } catch (OnCompletedFailedException ex) { + Throwable c = ex.getCause(); + Assert.assertNotNull(c); + + Assert.assertEquals("Forced failure", c.getMessage()); + } + } + + @Test + public void safeOnErrorThrows() { + try { + error.completable.safeSubscribe(new CompletableSubscriber() { + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + throw new TestException("Forced failure"); + } + + @Override + public void onSubscribe(Subscription d) { + + } + + }); + Assert.fail("Did not propagate exception!"); + } catch (OnErrorFailedException ex) { + Throwable c = ex.getCause(); + Assert.assertTrue("" + c, c instanceof CompositeException); + + CompositeException ce = (CompositeException)c; + + List list = ce.getExceptions(); + + Assert.assertEquals(2, list.size()); + + Assert.assertTrue("" + list.get(0), list.get(0) instanceof TestException); + Assert.assertNull(list.get(0).getMessage()); + + Assert.assertTrue("" + list.get(1), list.get(1) instanceof TestException); + Assert.assertEquals("Forced failure", list.get(1).getMessage()); + } + } + + @Test + public void safeOnErrorThrowsRegularSubscriber() { + try { + error.completable.safeSubscribe(new Subscriber() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + throw new TestException("Forced failure"); + } + + @Override + public void onNext(Object t) { + + } + }); + Assert.fail("Did not propagate exception!"); + } catch (OnErrorFailedException ex) { + Throwable c = ex.getCause(); + Assert.assertTrue("" + c, c instanceof CompositeException); + + CompositeException ce = (CompositeException)c; + + List list = ce.getExceptions(); + + Assert.assertEquals(2, list.size()); + + Assert.assertTrue("" + list.get(0), list.get(0) instanceof TestException); + Assert.assertNull(list.get(0).getMessage()); + + Assert.assertTrue("" + list.get(1), list.get(1) instanceof TestException); + Assert.assertEquals("Forced failure", list.get(1).getMessage()); + } + } + } \ No newline at end of file From 9469e9a02db0fc0ee0c19abbae43a14cad8e20f7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 14 May 2016 19:29:55 +0200 Subject: [PATCH 2/8] Fix typo, rename subscribe methods --- src/main/java/rx/Completable.java | 65 ++++++++++--------- src/main/java/rx/Single.java | 2 +- .../CompletableOnSubscribeConcat.java | 2 +- .../CompletableOnSubscribeConcatArray.java | 2 +- .../CompletableOnSubscribeConcatIterable.java | 2 +- .../CompletableOnSubscribeMerge.java | 2 +- .../CompletableOnSubscribeMergeArray.java | 2 +- ...etableOnSubscribeMergeDelayErrorArray.java | 2 +- ...bleOnSubscribeMergeDelayErrorIterable.java | 2 +- .../CompletableOnSubscribeMergeIterable.java | 2 +- .../CompletableOnSubscribeTimeout.java | 4 +- src/test/java/rx/CompletableTest.java | 58 ++++++++--------- src/test/java/rx/SingleTest.java | 4 +- .../operators/OnSubscribeCompletableTest.java | 12 ++-- 14 files changed, 81 insertions(+), 80 deletions(-) diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index e2a7f64298..d889356e1c 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -173,7 +173,7 @@ public void onSubscribe(Subscription d) { } // no need to have separate subscribers because inner is stateless - c.subscribe(inner); + c.unsafeSubscribe(inner); } } }); @@ -301,7 +301,7 @@ public void onSubscribe(Subscription d) { } // no need to have separate subscribers because inner is stateless - c.subscribe(inner); + c.unsafeSubscribe(inner); } } }); @@ -416,7 +416,7 @@ public void call(CompletableSubscriber s) { return; } - c.subscribe(s); + c.unsafeSubscribe(s); } }); } @@ -899,7 +899,7 @@ public void call(final CompletableSubscriber s) { final AtomicBoolean once = new AtomicBoolean(); - cs.subscribe(new CompletableSubscriber() { + cs.unsafeSubscribe(new CompletableSubscriber() { Subscription d; void dispose() { d.unsubscribe(); @@ -999,7 +999,7 @@ public final void await() { final CountDownLatch cdl = new CountDownLatch(1); final Throwable[] err = new Throwable[1]; - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -1050,7 +1050,7 @@ public final boolean await(long timeout, TimeUnit unit) { final CountDownLatch cdl = new CountDownLatch(1); final Throwable[] err = new Throwable[1]; - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -1190,7 +1190,7 @@ public void call(final CompletableSubscriber s) { final Scheduler.Worker w = scheduler.createWorker(); set.add(w); - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override @@ -1302,7 +1302,7 @@ protected final Completable doOnLifecycle( return create(new CompletableOnSubscribe() { @Override public void call(final CompletableSubscriber s) { - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -1434,7 +1434,7 @@ public final Throwable get() { final CountDownLatch cdl = new CountDownLatch(1); final Throwable[] err = new Throwable[1]; - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -1478,7 +1478,7 @@ public final Throwable get(long timeout, TimeUnit unit) { final CountDownLatch cdl = new CountDownLatch(1); final Throwable[] err = new Throwable[1]; - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -1530,7 +1530,7 @@ public void call(CompletableSubscriber s) { CompletableSubscriber sw = onLift.call(s); - subscribe(sw); + unsafeSubscribe(sw); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { @@ -1571,7 +1571,7 @@ public void call(final CompletableSubscriber s) { s.onSubscribe(ad); - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -1633,7 +1633,7 @@ public final Completable onErrorComplete(final Func1 return create(new CompletableOnSubscribe() { @Override public void call(final CompletableSubscriber s) { - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -1682,7 +1682,7 @@ public final Completable onErrorResumeNext(final Func1 Observable startWith(Observable other) { */ public final Subscription subscribe() { final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { mad.unsubscribe(); @@ -1877,7 +1877,7 @@ public final Subscription subscribe(final Action0 onComplete) { requireNonNull(onComplete); final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { try { @@ -1919,7 +1919,7 @@ public final Subscription subscribe(final Action1 onError, fi requireNonNull(onComplete); final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { try { @@ -1963,7 +1963,7 @@ private static void deliverUncaughtException(Throwable e) { * @param s the CompletableSubscriber, not null * @throws NullPointerException if s is null */ - public final void subscribe(CompletableSubscriber s) { + public final void unsafeSubscribe(CompletableSubscriber s) { requireNonNull(s); try { // TODO plugin wrapping the subscriber @@ -1980,11 +1980,11 @@ public final void subscribe(CompletableSubscriber s) { /** * Subscribes the given CompletableSubscriber to this Completable instance - * and handles exceptions throw by its onXXX methods. + * and handles exceptions thrown by its onXXX methods. * @param s the CompletableSubscriber, not null * @throws NullPointerException if s is null */ - public final void safeSubscribe(CompletableSubscriber s) { + public final void subscribe(CompletableSubscriber s) { requireNonNull(s); try { // TODO plugin wrapping the subscriber @@ -2000,12 +2000,12 @@ public final void safeSubscribe(CompletableSubscriber s) { } /** - * Subscribes a reactive-streams Subscriber to this Completable instance which + * Subscribes a regular Subscriber to this Completable instance which * will receive only an onError or onComplete event. * @param s the reactive-streams Subscriber, not null * @throws NullPointerException if s is null */ - public final void subscribe(Subscriber s) { + public final void unsafeSubscribe(Subscriber s) { requireNonNull(s); try { final Subscriber sw = s; // FIXME hooking in 1.x is kind of strange to me @@ -2014,7 +2014,7 @@ public final void subscribe(Subscriber s) { throw new NullPointerException("The RxJavaPlugins.onSubscribe returned a null Subscriber"); } - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { sw.onCompleted(); @@ -2041,12 +2041,13 @@ public void onSubscribe(Subscription d) { } /** - * Subscribes a reactive-streams Subscriber to this Completable instance which - * will receive only an onError or onComplete event. + * Subscribes a regular Subscriber to this Completable instance which + * will receive only an onError or onComplete event + * and handles exceptions thrown by its onXXX methods. * @param s the reactive-streams Subscriber, not null * @throws NullPointerException if s is null */ - public final void safeSubscribe(Subscriber s) { + public final void subscribe(Subscriber s) { requireNonNull(s); try { final Subscriber sw = s; // FIXME hooking in 1.x is kind of strange to me @@ -2055,7 +2056,7 @@ public final void safeSubscribe(Subscriber s) { throw new NullPointerException("The RxJavaPlugins.onSubscribe returned a null Subscriber"); } - subscribe(new SafeCompletableSubscriber(new CompletableSubscriber() { + unsafeSubscribe(new SafeCompletableSubscriber(new CompletableSubscriber() { @Override public void onCompleted() { sw.onCompleted(); @@ -2102,7 +2103,7 @@ public void call(final CompletableSubscriber s) { @Override public void call() { try { - subscribe(s); + unsafeSubscribe(s); } finally { w.unsubscribe(); } @@ -2205,7 +2206,7 @@ public final Observable toObservable() { return Observable.create(new OnSubscribe() { @Override public void call(Subscriber s) { - subscribe(s); + unsafeSubscribe(s); } }); } @@ -2222,7 +2223,7 @@ public final Single toSingle(final Func0 completionValueFunc return Single.create(new rx.Single.OnSubscribe() { @Override public void call(final SingleSubscriber s) { - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -2286,7 +2287,7 @@ public final Completable unsubscribeOn(final Scheduler scheduler) { return create(new CompletableOnSubscribe() { @Override public void call(final CompletableSubscriber s) { - subscribe(new CompletableSubscriber() { + unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 7ba3325434..eb2216d771 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1927,7 +1927,7 @@ public void onSubscribe(Subscription d) { serial.add(main); child.add(serial); - other.subscribe(so); + other.unsafeSubscribe(so); return main; } diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java index 8257c158a7..d08ea63508 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java @@ -129,7 +129,7 @@ void next() { return; } - c.subscribe(inner); + c.unsafeSubscribe(inner); } final class ConcatInnerSubscriber implements CompletableSubscriber { diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcatArray.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcatArray.java index c1f48f61b7..0b132acd43 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcatArray.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcatArray.java @@ -89,7 +89,7 @@ void next() { return; } - a[idx].subscribe(this); + a[idx].unsafeSubscribe(this); } while (decrementAndGet() != 0); } } diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcatIterable.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcatIterable.java index fe6211153e..0ce11fde78 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcatIterable.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcatIterable.java @@ -128,7 +128,7 @@ void next() { return; } - c.subscribe(this); + c.unsafeSubscribe(this); } while (decrementAndGet() != 0); } } diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java index 7c3fff300c..5a3d79f8e6 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java @@ -100,7 +100,7 @@ public void onNext(Completable t) { wip.getAndIncrement(); - t.subscribe(new CompletableSubscriber() { + t.unsafeSubscribe(new CompletableSubscriber() { Subscription d; boolean innerDone; @Override diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java index 85d3d59b3a..f78c960430 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java @@ -54,7 +54,7 @@ public void call(final CompletableSubscriber s) { } } - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { set.add(d); diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorArray.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorArray.java index 2a89afbaa2..d98a6a039e 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorArray.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorArray.java @@ -51,7 +51,7 @@ public void call(final CompletableSubscriber s) { continue; } - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { set.add(d); diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java index be783e6d6d..9d2567135e 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java @@ -117,7 +117,7 @@ public void call(final CompletableSubscriber s) { wip.getAndIncrement(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { set.add(d); diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java index 7ad953e4de..e13670cd20 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java @@ -110,7 +110,7 @@ public void call(final CompletableSubscriber s) { wip.getAndIncrement(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { set.add(d); diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java index 2a9c8e31e2..b62c62274d 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java @@ -60,7 +60,7 @@ public void call() { if (other == null) { s.onError(new TimeoutException()); } else { - other.subscribe(new CompletableSubscriber() { + other.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -85,7 +85,7 @@ public void onCompleted() { } }, timeout, unit); - source.subscribe(new CompletableSubscriber() { + source.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index d68bea3015..f0dbab4835 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -1159,7 +1159,7 @@ public void call(Long v) { public void never() { final AtomicBoolean onSubscribeCalled = new AtomicBoolean(); final AtomicInteger calls = new AtomicInteger(); - Completable.never().subscribe(new CompletableSubscriber() { + Completable.never().unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { onSubscribeCalled.set(true); @@ -1202,7 +1202,7 @@ public void timerTestScheduler() { final AtomicInteger calls = new AtomicInteger(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -1235,7 +1235,7 @@ public void timerCancel() throws InterruptedException { final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); final AtomicInteger calls = new AtomicInteger(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { mad.set(d); @@ -1295,7 +1295,7 @@ public void call(Integer d) { final AtomicBoolean unsubscribedFirst = new AtomicBoolean(); final AtomicReference error = new AtomicReference(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -1341,7 +1341,7 @@ public void call(Integer d) { final AtomicBoolean unsubscribedFirst = new AtomicBoolean(); final AtomicReference error = new AtomicReference(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -1387,7 +1387,7 @@ public void call(Integer d) { final AtomicBoolean unsubscribedFirst = new AtomicBoolean(); final AtomicBoolean complete = new AtomicBoolean(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -1433,7 +1433,7 @@ public void call(Integer d) { final AtomicBoolean unsubscribedFirst = new AtomicBoolean(); final AtomicBoolean complete = new AtomicBoolean(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -1630,7 +1630,7 @@ public void delayNormal() throws InterruptedException { final AtomicBoolean done = new AtomicBoolean(); final AtomicReference error = new AtomicReference(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -1665,7 +1665,7 @@ public void delayErrorImmediately() throws InterruptedException { final AtomicBoolean done = new AtomicBoolean(); final AtomicReference error = new AtomicReference(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -1699,7 +1699,7 @@ public void delayErrorToo() throws InterruptedException { final AtomicBoolean done = new AtomicBoolean(); final AtomicReference error = new AtomicReference(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -1826,7 +1826,7 @@ public void call() { } }); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { d.unsubscribe(); @@ -1858,7 +1858,7 @@ public void doOnDisposeThrows() { public void call() { throw new TestException(); } }); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { d.unsubscribe(); @@ -2017,7 +2017,7 @@ public void call() { } }); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -2173,7 +2173,7 @@ public void observeOnNormal() throws InterruptedException { Completable c = normal.completable.observeOn(Schedulers.computation()); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -2206,7 +2206,7 @@ public void observeOnError() throws InterruptedException { Completable c = error.completable.observeOn(Schedulers.computation()); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(Subscription d) { @@ -2337,7 +2337,7 @@ public Object call() throws Exception { } }).repeat(); - c.subscribe(new CompletableSubscriber() { + c.unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(final Subscription d) { final Scheduler.Worker w = Schedulers.io().createWorker(); @@ -2680,19 +2680,19 @@ public void subscribeActionNull() { @Test(expected = NullPointerException.class) public void subscribeSubscriberNull() { - normal.completable.subscribe((Subscriber)null); + normal.completable.unsafeSubscribe((Subscriber)null); } @Test(expected = NullPointerException.class) public void subscribeCompletableSubscriberNull() { - normal.completable.subscribe((CompletableSubscriber)null); + normal.completable.unsafeSubscribe((CompletableSubscriber)null); } @Test(timeout = 1000) public void subscribeSubscriberNormal() { TestSubscriber ts = new TestSubscriber(); - normal.completable.subscribe(ts); + normal.completable.unsafeSubscribe(ts); ts.assertCompleted(); ts.assertNoValues(); @@ -2703,7 +2703,7 @@ public void subscribeSubscriberNormal() { public void subscribeSubscriberError() { TestSubscriber ts = new TestSubscriber(); - error.completable.subscribe(ts); + error.completable.unsafeSubscribe(ts); ts.assertNotCompleted(); ts.assertNoValues(); @@ -3028,7 +3028,7 @@ public void call() { } }) .unsubscribeOn(Schedulers.computation()) - .subscribe(new CompletableSubscriber() { + .unsafeSubscribe(new CompletableSubscriber() { @Override public void onSubscribe(final Subscription d) { final Scheduler.Worker w = Schedulers.io().createWorker(); @@ -3628,7 +3628,7 @@ public Integer call() { public Completable call(Integer t) { throw new TestException(); } - }, onDispose).subscribe(ts); + }, onDispose).unsafeSubscribe(ts); verify(onDispose).call(1); @@ -3659,7 +3659,7 @@ public Integer call() { public Completable call(Integer t) { throw new TestException(); } - }, onDispose).subscribe(ts); + }, onDispose).unsafeSubscribe(ts); ts.assertNoValues(); ts.assertNotCompleted(); @@ -3693,7 +3693,7 @@ public Integer call() { public Completable call(Integer t) { return null; } - }, onDispose).subscribe(ts); + }, onDispose).unsafeSubscribe(ts); verify(onDispose).call(1); @@ -3724,7 +3724,7 @@ public Integer call() { public Completable call(Integer t) { return null; } - }, onDispose).subscribe(ts); + }, onDispose).unsafeSubscribe(ts); ts.assertNoValues(); ts.assertNotCompleted(); @@ -3901,7 +3901,7 @@ private static void expectUncaughtTestException(Action0 action) { @Test public void safeOnCompleteThrows() { try { - normal.completable.safeSubscribe(new CompletableSubscriber() { + normal.completable.subscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -3931,7 +3931,7 @@ public void onSubscribe(Subscription d) { @Test public void safeOnCompleteThrowsRegularSubscriber() { try { - normal.completable.safeSubscribe(new Subscriber() { + normal.completable.subscribe(new Subscriber() { @Override public void onCompleted() { @@ -3960,7 +3960,7 @@ public void onNext(Object t) { @Test public void safeOnErrorThrows() { try { - error.completable.safeSubscribe(new CompletableSubscriber() { + error.completable.subscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -3999,7 +3999,7 @@ public void onSubscribe(Subscription d) { @Test public void safeOnErrorThrowsRegularSubscriber() { try { - error.completable.safeSubscribe(new Subscriber() { + error.completable.subscribe(new Subscriber() { @Override public void onCompleted() { diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 3760330eb4..71dd0a0de1 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -816,7 +816,7 @@ public void testToObservable() { public void toCompletableSuccess() { Completable completable = Single.just("value").toCompletable(); TestSubscriber testSubscriber = new TestSubscriber(); - completable.subscribe(testSubscriber); + completable.unsafeSubscribe(testSubscriber); testSubscriber.assertCompleted(); testSubscriber.assertNoValues(); @@ -828,7 +828,7 @@ public void toCompletableError() { TestException exception = new TestException(); Completable completable = Single.error(exception).toCompletable(); TestSubscriber testSubscriber = new TestSubscriber(); - completable.subscribe(testSubscriber); + completable.unsafeSubscribe(testSubscriber); testSubscriber.assertError(exception); testSubscriber.assertNoValues(); diff --git a/src/test/java/rx/internal/operators/OnSubscribeCompletableTest.java b/src/test/java/rx/internal/operators/OnSubscribeCompletableTest.java index e30bb78062..8cad9c96b6 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeCompletableTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeCompletableTest.java @@ -32,7 +32,7 @@ public class OnSubscribeCompletableTest { public void testJustSingleItemObservable() { TestSubscriber subscriber = TestSubscriber.create(); Completable cmp = Observable.just("Hello World!").toCompletable(); - cmp.subscribe(subscriber); + cmp.unsafeSubscribe(subscriber); subscriber.assertNoValues(); subscriber.assertCompleted(); @@ -44,7 +44,7 @@ public void testErrorObservable() { TestSubscriber subscriber = TestSubscriber.create(); IllegalArgumentException error = new IllegalArgumentException("Error"); Completable cmp = Observable.error(error).toCompletable(); - cmp.subscribe(subscriber); + cmp.unsafeSubscribe(subscriber); subscriber.assertError(error); subscriber.assertNoValues(); @@ -54,7 +54,7 @@ public void testErrorObservable() { public void testJustTwoEmissionsObservableThrowsError() { TestSubscriber subscriber = TestSubscriber.create(); Completable cmp = Observable.just("First", "Second").toCompletable(); - cmp.subscribe(subscriber); + cmp.unsafeSubscribe(subscriber); subscriber.assertNoErrors(); subscriber.assertNoValues(); @@ -64,7 +64,7 @@ public void testJustTwoEmissionsObservableThrowsError() { public void testEmptyObservable() { TestSubscriber subscriber = TestSubscriber.create(); Completable cmp = Observable.empty().toCompletable(); - cmp.subscribe(subscriber); + cmp.unsafeSubscribe(subscriber); subscriber.assertNoErrors(); subscriber.assertNoValues(); @@ -75,7 +75,7 @@ public void testEmptyObservable() { public void testNeverObservable() { TestSubscriber subscriber = TestSubscriber.create(); Completable cmp = Observable.never().toCompletable(); - cmp.subscribe(subscriber); + cmp.unsafeSubscribe(subscriber); subscriber.assertNoTerminalEvent(); subscriber.assertNoValues(); @@ -91,7 +91,7 @@ public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() { public void call() { unsubscribed.set(true); }}).toCompletable(); - cmp.subscribe(subscriber); + cmp.unsafeSubscribe(subscriber); subscriber.assertCompleted(); assertFalse(unsubscribed.get()); } From 98c0315dcb0a09cd23cd946eda749e9e1b01795e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 15 May 2016 00:03:30 +0200 Subject: [PATCH 3/8] Add Completable plugin support --- src/main/java/rx/Completable.java | 60 +++++------ .../RxJavaCompletableExecutionHook.java | 100 ++++++++++++++++++ src/main/java/rx/plugins/RxJavaPlugins.java | 43 ++++++++ src/test/java/rx/CompletableTest.java | 55 +++++++++- .../java/rx/plugins/RxJavaPluginsTest.java | 5 + 5 files changed, 225 insertions(+), 38 deletions(-) create mode 100644 src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index d889356e1c..7b165791d2 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -38,6 +38,12 @@ */ @Experimental public class Completable { + /** The error handler instance. */ + static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler(); + + /** The completable hook. */ + static RxJavaCompletableExecutionHook HOOK = RxJavaPlugins.getInstance().getCompletableExecutionHook(); + /** * Callback used for building deferred computations that takes a CompletableSubscriber. */ @@ -101,9 +107,6 @@ public void call(CompletableSubscriber s) { } }); - /** The error handler instance. */ - static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler(); - /** * Returns a Completable which terminates as soon as one of the source Completables * terminates (normally or with an error) and cancels all other Completables. @@ -975,7 +978,7 @@ public void call() { * not null (not verified) */ protected Completable(CompletableOnSubscribe onSubscribe) { - this.onSubscribe = onSubscribe; + this.onSubscribe = HOOK.onCreate(onSubscribe); } /** @@ -1526,9 +1529,8 @@ public final Completable lift(final CompletableOperator onLift) { @Override public void call(CompletableSubscriber s) { try { - // TODO plugin wrapping - - CompletableSubscriber sw = onLift.call(s); + CompletableOperator onLiftDecorated = HOOK.onLift(onLift); + CompletableSubscriber sw = onLiftDecorated.call(s); unsafeSubscribe(sw); } catch (NullPointerException ex) { @@ -1966,13 +1968,14 @@ private static void deliverUncaughtException(Throwable e) { public final void unsafeSubscribe(CompletableSubscriber s) { requireNonNull(s); try { - // TODO plugin wrapping the subscriber + CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe); - onSubscribe.call(s); + onSubscribeDecorated.call(s); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + ex = HOOK.onSubscribeError(ex); ERROR_HANDLER.handleError(ex); throw toNpe(ex); } @@ -1987,13 +1990,14 @@ public final void unsafeSubscribe(CompletableSubscriber s) { public final void subscribe(CompletableSubscriber s) { requireNonNull(s); try { - // TODO plugin wrapping the subscriber + CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe); - onSubscribe.call(new SafeCompletableSubscriber(s)); + onSubscribeDecorated.call(new SafeCompletableSubscriber(s)); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + ex = HOOK.onSubscribeError(ex); ERROR_HANDLER.handleError(ex); throw toNpe(ex); } @@ -2005,36 +2009,31 @@ public final void subscribe(CompletableSubscriber s) { * @param s the reactive-streams Subscriber, not null * @throws NullPointerException if s is null */ - public final void unsafeSubscribe(Subscriber s) { + public final void unsafeSubscribe(final Subscriber s) { requireNonNull(s); try { - final Subscriber sw = s; // FIXME hooking in 1.x is kind of strange to me - - if (sw == null) { - throw new NullPointerException("The RxJavaPlugins.onSubscribe returned a null Subscriber"); - } - unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { - sw.onCompleted(); + s.onCompleted(); } @Override public void onError(Throwable e) { - sw.onError(e); + s.onError(e); } @Override public void onSubscribe(Subscription d) { - sw.add(d); + s.add(d); } }); - + RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + ex = HOOK.onSubscribeError(ex); ERROR_HANDLER.handleError(ex); throw toNpe(ex); } @@ -2047,36 +2046,31 @@ public void onSubscribe(Subscription d) { * @param s the reactive-streams Subscriber, not null * @throws NullPointerException if s is null */ - public final void subscribe(Subscriber s) { + public final void subscribe(final Subscriber s) { requireNonNull(s); try { - final Subscriber sw = s; // FIXME hooking in 1.x is kind of strange to me - - if (sw == null) { - throw new NullPointerException("The RxJavaPlugins.onSubscribe returned a null Subscriber"); - } - unsafeSubscribe(new SafeCompletableSubscriber(new CompletableSubscriber() { @Override public void onCompleted() { - sw.onCompleted(); + s.onCompleted(); } @Override public void onError(Throwable e) { - sw.onError(e); + s.onError(e); } @Override public void onSubscribe(Subscription d) { - sw.add(d); + s.add(d); } })); - + RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + ex = HOOK.onSubscribeError(ex); ERROR_HANDLER.handleError(ex); throw toNpe(ex); } diff --git a/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java b/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java new file mode 100644 index 0000000000..3d791f182d --- /dev/null +++ b/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java @@ -0,0 +1,100 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +import rx.*; +import rx.functions.Func1; + +/** + * Abstract ExecutionHook with invocations at different lifecycle points of {@link Completable} execution with a + * default no-op implementation. + *

+ * See {@link RxJavaPlugins} or the RxJava GitHub Wiki for information on configuring plugins: + * https://github.com/ReactiveX/RxJava/wiki/Plugins. + *

+ * Note on thread-safety and performance: + *

+ * A single implementation of this class will be used globally so methods on this class will be invoked + * concurrently from multiple threads so all functionality must be thread-safe. + *

+ * Methods are also invoked synchronously and will add to execution time of the completable so all behavior + * should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate + * worker threads. + * + */ +public abstract class RxJavaCompletableExecutionHook { + /** + * Invoked during the construction by {@link Completable#create(Completable.CompletableOnSubscribe)} + *

+ * This can be used to decorate or replace the onSubscribe function or just perform extra + * logging, metrics and other such things and pass through the function. + * + * @param f + * original {@link Completable.CompletableOnSubscribe}<{@code T}> to be executed + * @return {@link Completable.CompletableOnSubscribe} function that can be modified, decorated, replaced or just + * returned as a pass through + */ + public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe f) { + return f; + } + + /** + * Invoked before {@link Completable#subscribe(Subscriber)} is about to be executed. + *

+ * This can be used to decorate or replace the onSubscribe function or just perform extra + * logging, metrics and other such things and pass through the function. + * + * @param onSubscribe + * original {@link Completable.CompletableOnSubscribe}<{@code T}> to be executed + * @return {@link Completable.CompletableOnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just + * returned as a pass through + */ + public Completable.CompletableOnSubscribe onSubscribeStart(Completable completableInstance, final Completable.CompletableOnSubscribe onSubscribe) { + // pass through by default + return onSubscribe; + } + + /** + * Invoked after failed execution of {@link Completable#subscribe(Subscriber)} with thrown Throwable. + *

+ * This is not errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when + * attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code }, {@link Subscription}>. + * + * @param e + * Throwable thrown by {@link Completable#subscribe(Subscriber)} + * @return Throwable that can be decorated, replaced or just returned as a pass through + */ + public Throwable onSubscribeError(Throwable e) { + // pass through by default + return e; + } + + /** + * Invoked just as the operator functions is called to bind two operations together into a new + * {@link Completable} and the return value is used as the lifted function + *

+ * This can be used to decorate or replace the {@link Completable.CompletableOperator} instance or just perform extra + * logging, metrics and other such things and pass through the onSubscribe. + * + * @param lift + * original {@link Completable.CompletableOperator}{@code } + * @return {@link Completable.CompletableOperator}{@code } function that can be modified, decorated, replaced or just + * returned as a pass through + */ + public Completable.CompletableOperator onLift(final Completable.CompletableOperator lift) { + return lift; + } +} diff --git a/src/main/java/rx/plugins/RxJavaPlugins.java b/src/main/java/rx/plugins/RxJavaPlugins.java index 622916c661..fe01c7f175 100644 --- a/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/src/main/java/rx/plugins/RxJavaPlugins.java @@ -52,6 +52,7 @@ public class RxJavaPlugins { private final AtomicReference errorHandler = new AtomicReference(); private final AtomicReference observableExecutionHook = new AtomicReference(); private final AtomicReference singleExecutionHook = new AtomicReference(); + private final AtomicReference completableExecutionHook = new AtomicReference(); private final AtomicReference schedulersHook = new AtomicReference(); /** @@ -211,6 +212,48 @@ public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) { } } + /** + * Retrieves the instance of {@link RxJavaCompletableExecutionHook} to use based on order of precedence as + * defined in {@link RxJavaPlugins} class header. + *

+ * Override the default by calling {@link #registerCompletableExecutionHook(RxJavaCompletableExecutionHook)} + * or by setting the property {@code rxjava.plugin.RxJavaCompletableExecutionHook.implementation} with the + * full classname to load. + * + * @return {@link RxJavaCompletableExecutionHook} implementation to use + */ + public RxJavaCompletableExecutionHook getCompletableExecutionHook() { + if (completableExecutionHook.get() == null) { + // check for an implementation from System.getProperty first + Object impl = getPluginImplementationViaProperty(RxJavaCompletableExecutionHook.class, System.getProperties()); + if (impl == null) { + // nothing set via properties so initialize with default + completableExecutionHook.compareAndSet(null, new RxJavaCompletableExecutionHook() { }); + // we don't return from here but call get() again in case of thread-race so the winner will always get returned + } else { + // we received an implementation from the system property so use it + completableExecutionHook.compareAndSet(null, (RxJavaCompletableExecutionHook) impl); + } + } + return completableExecutionHook.get(); + } + + /** + * Register an {@link RxJavaCompletableExecutionHook} implementation as a global override of any injected or + * default implementations. + * + * @param impl + * {@link RxJavaCompletableExecutionHook} implementation + * @throws IllegalStateException + * if called more than once or after the default was initialized (if usage occurs before trying + * to register) + */ + public void registerCompletableExecutionHook(RxJavaCompletableExecutionHook impl) { + if (!completableExecutionHook.compareAndSet(null, impl)) { + throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get()); + } + } + /* test */ static Object getPluginImplementationViaProperty(Class pluginClass, Properties propsIn) { // Make a defensive clone because traversal may fail with ConcurrentModificationException // if the properties get changed by something outside RxJava. diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index f0dbab4835..36bf39f146 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -16,6 +16,10 @@ package rx; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -26,15 +30,12 @@ import rx.Observable.OnSubscribe; import rx.exceptions.*; import rx.functions.*; -import rx.observers.TestSubscriber; -import rx.plugins.RxJavaPlugins; +import rx.observers.*; +import rx.plugins.*; import rx.schedulers.*; import rx.subjects.PublishSubject; import rx.subscriptions.*; -import static org.mockito.Mockito.*; -import static org.junit.Assert.*; - /** * Test Completable methods and operators. */ @@ -4035,4 +4036,48 @@ public void onNext(Object t) { } } + private static RxJavaCompletableExecutionHook hookSpy; + + @Before + public void setUp() throws Exception { + hookSpy = spy( + new RxJavaPluginsTest.RxJavaCompletableExecutionHookTestImpl()); + Completable.HOOK = hookSpy; + } + + @Test + public void testHookCreate() { + CompletableOnSubscribe subscriber = mock(CompletableOnSubscribe.class); + Completable.create(subscriber); + + verify(hookSpy, times(1)).onCreate(subscriber); + } + + @Test + public void testHookSubscribeStart() { + TestSubscriber ts = new TestSubscriber(); + + Completable completable = Completable.create(new CompletableOnSubscribe() { + @Override public void call(CompletableSubscriber s) { + s.onCompleted(); + } + }); + completable.subscribe(ts); + + verify(hookSpy, times(1)).onSubscribeStart(eq(completable), any(Completable.CompletableOnSubscribe.class)); + } + + @Test + public void testHookUnsafeSubscribeStart() { + TestSubscriber ts = new TestSubscriber(); + Completable completable = Completable.create(new CompletableOnSubscribe() { + @Override public void call(CompletableSubscriber s) { + s.onCompleted(); + } + }); + completable.unsafeSubscribe(ts); + + verify(hookSpy, times(1)).onSubscribeStart(eq(completable), any(Completable.CompletableOnSubscribe.class)); + } + } \ No newline at end of file diff --git a/src/test/java/rx/plugins/RxJavaPluginsTest.java b/src/test/java/rx/plugins/RxJavaPluginsTest.java index 64a1ba1d1a..b95503b026 100644 --- a/src/test/java/rx/plugins/RxJavaPluginsTest.java +++ b/src/test/java/rx/plugins/RxJavaPluginsTest.java @@ -253,6 +253,11 @@ public static class RxJavaSingleExecutionHookTestImpl extends RxJavaSingleExecut // just use defaults } + // inside test so it is stripped from Javadocs + public static class RxJavaCompletableExecutionHookTestImpl extends RxJavaCompletableExecutionHook { + // just use defaults + } + private static String getFullClassNameForTestClass(Class cls) { return RxJavaPlugins.class.getPackage() .getName() + "." + RxJavaPluginsTest.class.getSimpleName() + "$" + cls.getSimpleName(); From 159f8687539be7606ea90d91cf7bb5bde3db2408 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 15 May 2016 10:39:04 +0200 Subject: [PATCH 4/8] Safe Completable now calls plugin handler for onError calls unconditionally. --- .../observers/SafeCompletableSubscriber.java | 2 +- .../java/rx/plugins/RxJavaPluginsTest.java | 54 ++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/main/java/rx/observers/SafeCompletableSubscriber.java b/src/main/java/rx/observers/SafeCompletableSubscriber.java index 3e85300ef4..4e1ec0ea80 100644 --- a/src/main/java/rx/observers/SafeCompletableSubscriber.java +++ b/src/main/java/rx/observers/SafeCompletableSubscriber.java @@ -52,8 +52,8 @@ public void onCompleted() { @Override public void onError(Throwable e) { + RxJavaPluginUtils.handleException(e); if (done) { - RxJavaPluginUtils.handleException(e); return; } done = true; diff --git a/src/test/java/rx/plugins/RxJavaPluginsTest.java b/src/test/java/rx/plugins/RxJavaPluginsTest.java index b95503b026..9c471cfa9c 100644 --- a/src/test/java/rx/plugins/RxJavaPluginsTest.java +++ b/src/test/java/rx/plugins/RxJavaPluginsTest.java @@ -23,8 +23,8 @@ import org.junit.*; +import rx.*; import rx.Observable; -import rx.Subscriber; import rx.exceptions.OnErrorThrowable; import rx.functions.Func1; @@ -291,4 +291,56 @@ public void testShortPluginDiscoveryMissing() { RxJavaPlugins.getPluginImplementationViaProperty(Map.class, props); } + + @Test + public void testOnErrorWhenUsingCompletable() { + RxJavaErrorHandlerTestImpl errorHandler = new RxJavaErrorHandlerTestImpl(); + RxJavaPlugins.getInstance().registerErrorHandler(errorHandler); + + RuntimeException re = new RuntimeException("test onError"); + Completable.error(re).subscribe(new Subscriber() { + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Object o) { + + } + }); + assertEquals(re, errorHandler.e); + assertEquals(1, errorHandler.count); + } + + @Test + public void testOnErrorWhenUsingSingle() { + RxJavaErrorHandlerTestImpl errorHandler = new RxJavaErrorHandlerTestImpl(); + RxJavaPlugins.getInstance().registerErrorHandler(errorHandler); + + RuntimeException re = new RuntimeException("test onError"); + Single.error(re).subscribe(new Subscriber() { + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Object o) { + + } + }); + assertEquals(re, errorHandler.e); + assertEquals(1, errorHandler.count); + } } From aa7d82f177a6dcf4043b5cefdc02b303e473bdcf Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 17 May 2016 11:34:41 +0200 Subject: [PATCH 5/8] Reuse subscribe methods, call onStart for Subscriber --- src/main/java/rx/Completable.java | 67 ++++++++++----------------- src/test/java/rx/CompletableTest.java | 34 +++++++++++++- 2 files changed, 58 insertions(+), 43 deletions(-) diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index 7b165791d2..10caf0a1d2 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -26,7 +26,7 @@ import rx.functions.*; import rx.internal.operators.*; import rx.internal.util.*; -import rx.observers.SafeCompletableSubscriber; +import rx.observers.*; import rx.plugins.*; import rx.schedulers.Schedulers; import rx.subscriptions.*; @@ -382,8 +382,6 @@ public static Completable concat(Observable sources, int public static Completable create(CompletableOnSubscribe onSubscribe) { requireNonNull(onSubscribe); try { - // TODO plugin wrapping onSubscribe - return new Completable(onSubscribe); } catch (NullPointerException ex) { throw ex; @@ -1988,30 +1986,36 @@ public final void unsafeSubscribe(CompletableSubscriber s) { * @throws NullPointerException if s is null */ public final void subscribe(CompletableSubscriber s) { - requireNonNull(s); - try { - CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe); - - onSubscribeDecorated.call(new SafeCompletableSubscriber(s)); - } catch (NullPointerException ex) { - throw ex; - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - ex = HOOK.onSubscribeError(ex); - ERROR_HANDLER.handleError(ex); - throw toNpe(ex); + if (!(s instanceof SafeCompletableSubscriber)) { + s = new SafeCompletableSubscriber(s); } + unsafeSubscribe(s); } /** * Subscribes a regular Subscriber to this Completable instance which * will receive only an onError or onComplete event. + * @param the value type of the subscriber * @param s the reactive-streams Subscriber, not null * @throws NullPointerException if s is null */ public final void unsafeSubscribe(final Subscriber s) { + unsafeSubscribe(s, true); + } + + /** + * Performs the actual unsafe subscription and calls the onStart if required. + * @param the value type of the subscriber + * @param s the subscriber instance, not null + * @param callOnStart if true, the Subscriber.onStart will be called + * @throws NullPointerException if s is null + */ + private final void unsafeSubscribe(final Subscriber s, boolean callOnStart) { requireNonNull(s); try { + if (callOnStart) { + s.onStart(); + } unsafeSubscribe(new CompletableSubscriber() { @Override public void onCompleted() { @@ -2043,37 +2047,16 @@ public void onSubscribe(Subscription d) { * Subscribes a regular Subscriber to this Completable instance which * will receive only an onError or onComplete event * and handles exceptions thrown by its onXXX methods. + * @param the value type of the subscriber * @param s the reactive-streams Subscriber, not null * @throws NullPointerException if s is null */ - public final void subscribe(final Subscriber s) { - requireNonNull(s); - try { - unsafeSubscribe(new SafeCompletableSubscriber(new CompletableSubscriber() { - @Override - public void onCompleted() { - s.onCompleted(); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - } - - @Override - public void onSubscribe(Subscription d) { - s.add(d); - } - })); - RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s); - } catch (NullPointerException ex) { - throw ex; - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - ex = HOOK.onSubscribeError(ex); - ERROR_HANDLER.handleError(ex); - throw toNpe(ex); + public final void subscribe(Subscriber s) { + s.onStart(); + if (!(s instanceof SafeSubscriber)) { + s = new SafeSubscriber(s); } + unsafeSubscribe(s, false); } /** diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index 36bf39f146..1fdb5f6127 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -30,7 +30,7 @@ import rx.Observable.OnSubscribe; import rx.exceptions.*; import rx.functions.*; -import rx.observers.*; +import rx.observers.TestSubscriber; import rx.plugins.*; import rx.schedulers.*; import rx.subjects.PublishSubject; @@ -4080,4 +4080,36 @@ public void testHookUnsafeSubscribeStart() { verify(hookSpy, times(1)).onSubscribeStart(eq(completable), any(Completable.CompletableOnSubscribe.class)); } + @Test + public void onStartCalledSafe() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onStart() { + onNext(1); + } + }; + + normal.completable.subscribe(ts); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void onStartCalledUnsafeSafe() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onStart() { + onNext(1); + } + }; + + normal.completable.unsafeSubscribe(ts); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } + } \ No newline at end of file From ebb38eba33a80e5f43c05d28c5849f78267b5b70 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 20 May 2016 09:09:37 +0200 Subject: [PATCH 6/8] Don't allow calling both onCompleted and onError methods --- src/main/java/rx/Completable.java | 52 ++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index 10caf0a1d2..991a8a3208 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -1878,15 +1878,19 @@ public final Subscription subscribe(final Action0 onComplete) { final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); unsafeSubscribe(new CompletableSubscriber() { + boolean done; @Override public void onCompleted() { - try { - onComplete.call(); - } catch (Throwable e) { - ERROR_HANDLER.handleError(e); - deliverUncaughtException(e); - } finally { - mad.unsubscribe(); + if (!done) { + done = true; + try { + onComplete.call(); + } catch (Throwable e) { + ERROR_HANDLER.handleError(e); + deliverUncaughtException(e); + } finally { + mad.unsubscribe(); + } } } @@ -1920,27 +1924,37 @@ public final Subscription subscribe(final Action1 onError, fi final MultipleAssignmentSubscription mad = new MultipleAssignmentSubscription(); unsafeSubscribe(new CompletableSubscriber() { + boolean done; @Override public void onCompleted() { - try { - onComplete.call(); - } catch (Throwable e) { - onError(e); - return; + if (!done) { + done = true; + try { + onComplete.call(); + } catch (Throwable e) { + onError(e); + return; + } + mad.unsubscribe(); } - mad.unsubscribe(); } @Override public void onError(Throwable e) { - try { - onError.call(e); - } catch (Throwable ex) { - e = new CompositeException(Arrays.asList(e, ex)); + if (!done) { + done = true; + try { + onError.call(e); + } catch (Throwable ex) { + e = new CompositeException(Arrays.asList(e, ex)); + ERROR_HANDLER.handleError(e); + deliverUncaughtException(e); + } finally { + mad.unsubscribe(); + } + } else { ERROR_HANDLER.handleError(e); deliverUncaughtException(e); - } finally { - mad.unsubscribe(); } } From f3958da7c8845aac8d288e369d3b16cedcb47799 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 20 May 2016 09:42:09 +0200 Subject: [PATCH 7/8] Call onError if onCompleted action crashes --- src/main/java/rx/Completable.java | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index 991a8a3208..0946dd087c 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -1932,7 +1932,7 @@ public void onCompleted() { try { onComplete.call(); } catch (Throwable e) { - onError(e); + callOnError(e); return; } mad.unsubscribe(); @@ -1943,21 +1943,25 @@ public void onCompleted() { public void onError(Throwable e) { if (!done) { done = true; - try { - onError.call(e); - } catch (Throwable ex) { - e = new CompositeException(Arrays.asList(e, ex)); - ERROR_HANDLER.handleError(e); - deliverUncaughtException(e); - } finally { - mad.unsubscribe(); - } + callOnError(e); } else { ERROR_HANDLER.handleError(e); deliverUncaughtException(e); } } + void callOnError(Throwable e) { + try { + onError.call(e); + } catch (Throwable ex) { + e = new CompositeException(Arrays.asList(e, ex)); + ERROR_HANDLER.handleError(e); + deliverUncaughtException(e); + } finally { + mad.unsubscribe(); + } + } + @Override public void onSubscribe(Subscription d) { mad.set(d); From 1c1f37d85ea6983688929d79d171219b6d007694 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 1 Jun 2016 08:57:31 +0200 Subject: [PATCH 8/8] Added @Experimental annotations --- .../observers/SafeCompletableSubscriber.java | 6 ++++- .../RxJavaCompletableExecutionHook.java | 26 +++++++++++-------- src/main/java/rx/plugins/RxJavaPlugins.java | 4 +++ 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/main/java/rx/observers/SafeCompletableSubscriber.java b/src/main/java/rx/observers/SafeCompletableSubscriber.java index 4e1ec0ea80..12258115c7 100644 --- a/src/main/java/rx/observers/SafeCompletableSubscriber.java +++ b/src/main/java/rx/observers/SafeCompletableSubscriber.java @@ -16,14 +16,18 @@ package rx.observers; import rx.Completable.CompletableSubscriber; -import rx.exceptions.*; import rx.Subscription; +import rx.annotations.Experimental; +import rx.exceptions.*; import rx.internal.util.RxJavaPluginUtils; /** * Wraps another CompletableSubscriber and handles exceptions thrown * from onError and onCompleted. + * + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) */ +@Experimental public final class SafeCompletableSubscriber implements CompletableSubscriber, Subscription { final CompletableSubscriber actual; diff --git a/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java b/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java index 3d791f182d..87160f8326 100644 --- a/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java +++ b/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java @@ -16,6 +16,7 @@ package rx.plugins; import rx.*; +import rx.annotations.Experimental; import rx.functions.Func1; /** @@ -34,7 +35,9 @@ * should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate * worker threads. * + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) */ +@Experimental public abstract class RxJavaCompletableExecutionHook { /** * Invoked during the construction by {@link Completable#create(Completable.CompletableOnSubscribe)} @@ -43,11 +46,11 @@ public abstract class RxJavaCompletableExecutionHook { * logging, metrics and other such things and pass through the function. * * @param f - * original {@link Completable.CompletableOnSubscribe}<{@code T}> to be executed - * @return {@link Completable.CompletableOnSubscribe} function that can be modified, decorated, replaced or just + * original {@link rx.Completable.CompletableOnSubscribe}<{@code T}> to be executed + * @return {@link rx.Completable.CompletableOnSubscribe} function that can be modified, decorated, replaced or just * returned as a pass through */ - public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe f) { + public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe f) { return f; } @@ -57,12 +60,13 @@ public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOn * This can be used to decorate or replace the onSubscribe function or just perform extra * logging, metrics and other such things and pass through the function. * + * @param completableInstance the target completable instance * @param onSubscribe - * original {@link Completable.CompletableOnSubscribe}<{@code T}> to be executed - * @return {@link Completable.CompletableOnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just + * original {@link rx.Completable.CompletableOnSubscribe}<{@code T}> to be executed + * @return {@link rx.Completable.CompletableOnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just * returned as a pass through */ - public Completable.CompletableOnSubscribe onSubscribeStart(Completable completableInstance, final Completable.CompletableOnSubscribe onSubscribe) { + public Completable.CompletableOnSubscribe onSubscribeStart(Completable completableInstance, final Completable.CompletableOnSubscribe onSubscribe) { // pass through by default return onSubscribe; } @@ -77,7 +81,7 @@ public Completable.CompletableOnSubscribe onSubscribeStart(Completable compl * Throwable thrown by {@link Completable#subscribe(Subscriber)} * @return Throwable that can be decorated, replaced or just returned as a pass through */ - public Throwable onSubscribeError(Throwable e) { + public Throwable onSubscribeError(Throwable e) { // pass through by default return e; } @@ -86,15 +90,15 @@ public Throwable onSubscribeError(Throwable e) { * Invoked just as the operator functions is called to bind two operations together into a new * {@link Completable} and the return value is used as the lifted function *

- * This can be used to decorate or replace the {@link Completable.CompletableOperator} instance or just perform extra + * This can be used to decorate or replace the {@link rx.Completable.CompletableOperator} instance or just perform extra * logging, metrics and other such things and pass through the onSubscribe. * * @param lift - * original {@link Completable.CompletableOperator}{@code } - * @return {@link Completable.CompletableOperator}{@code } function that can be modified, decorated, replaced or just + * original {@link rx.Completable.CompletableOperator}{@code } + * @return {@link rx.Completable.CompletableOperator}{@code } function that can be modified, decorated, replaced or just * returned as a pass through */ - public Completable.CompletableOperator onLift(final Completable.CompletableOperator lift) { + public Completable.CompletableOperator onLift(final Completable.CompletableOperator lift) { return lift; } } diff --git a/src/main/java/rx/plugins/RxJavaPlugins.java b/src/main/java/rx/plugins/RxJavaPlugins.java index fe01c7f175..f9926ac588 100644 --- a/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/src/main/java/rx/plugins/RxJavaPlugins.java @@ -221,7 +221,9 @@ public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) { * full classname to load. * * @return {@link RxJavaCompletableExecutionHook} implementation to use + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) */ + @Experimental public RxJavaCompletableExecutionHook getCompletableExecutionHook() { if (completableExecutionHook.get() == null) { // check for an implementation from System.getProperty first @@ -247,7 +249,9 @@ public RxJavaCompletableExecutionHook getCompletableExecutionHook() { * @throws IllegalStateException * if called more than once or after the default was initialized (if usage occurs before trying * to register) + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) */ + @Experimental public void registerCompletableExecutionHook(RxJavaCompletableExecutionHook impl) { if (!completableExecutionHook.compareAndSet(null, impl)) { throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get());