From 7fd9b415df1babf590cd0cc13a709eedd5cd26f7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 25 Jan 2017 15:37:11 +0100 Subject: [PATCH 1/3] 2.x: option to fail for using blockingX on the computation scheduler --- .../observers/BlockingBaseObserver.java | 3 +- .../observers/BlockingMultiObserver.java | 7 +- .../internal/observers/FutureObserver.java | 3 + .../observers/FutureSingleObserver.java | 3 + .../flowable/BlockingFlowableIterable.java | 3 +- .../flowable/BlockingFlowableLatest.java | 3 +- .../flowable/BlockingFlowableNext.java | 3 +- .../flowable/FlowableBlockingSubscribe.java | 1 + .../BlockingObservableIterable.java | 3 +- .../observable/BlockingObservableLatest.java | 3 +- .../observable/BlockingObservableNext.java | 3 +- .../schedulers/ComputationScheduler.java | 2 +- .../schedulers/NonBlockingThread.java | 22 + .../internal/schedulers/RxThreadFactory.java | 18 +- .../internal/schedulers/SingleScheduler.java | 2 +- .../subscribers/BlockingBaseSubscriber.java | 3 +- .../subscribers/FutureSubscriber.java | 3 + .../internal/util/BlockingHelper.java | 15 +- .../io/reactivex/plugins/RxJavaPlugins.java | 33 + .../reactivex/plugins/RxJavaPluginsTest.java | 25 +- .../schedulers/FailOnBlockingTest.java | 642 ++++++++++++++++++ .../tck/DelaySubscriptionTckTest.java | 2 +- 22 files changed, 781 insertions(+), 21 deletions(-) create mode 100644 src/main/java/io/reactivex/internal/schedulers/NonBlockingThread.java create mode 100644 src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java diff --git a/src/main/java/io/reactivex/internal/observers/BlockingBaseObserver.java b/src/main/java/io/reactivex/internal/observers/BlockingBaseObserver.java index 1b3624df08..4efb537e08 100644 --- a/src/main/java/io/reactivex/internal/observers/BlockingBaseObserver.java +++ b/src/main/java/io/reactivex/internal/observers/BlockingBaseObserver.java @@ -16,7 +16,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; public abstract class BlockingBaseObserver extends CountDownLatch implements Observer, Disposable { @@ -67,6 +67,7 @@ public final boolean isDisposed() { public final T blockingGet() { if (getCount() != 0) { try { + BlockingHelper.verifyNonBlocking(); await(); } catch (InterruptedException ex) { dispose(); diff --git a/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java b/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java index 0ba9bb62ec..934123e381 100644 --- a/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java +++ b/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java @@ -17,7 +17,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; /** * A combined Observer that awaits the success or error signal via a CountDownLatch. @@ -79,6 +79,7 @@ public void onComplete() { public T blockingGet() { if (getCount() != 0) { try { + BlockingHelper.verifyNonBlocking(); await(); } catch (InterruptedException ex) { dispose(); @@ -101,6 +102,7 @@ public T blockingGet() { public T blockingGet(T defaultValue) { if (getCount() != 0) { try { + BlockingHelper.verifyNonBlocking(); await(); } catch (InterruptedException ex) { dispose(); @@ -123,6 +125,7 @@ public T blockingGet(T defaultValue) { public Throwable blockingGetError() { if (getCount() != 0) { try { + BlockingHelper.verifyNonBlocking(); await(); } catch (InterruptedException ex) { dispose(); @@ -142,6 +145,7 @@ public Throwable blockingGetError() { public Throwable blockingGetError(long timeout, TimeUnit unit) { if (getCount() != 0) { try { + BlockingHelper.verifyNonBlocking(); if (!await(timeout, unit)) { dispose(); throw ExceptionHelper.wrapOrThrow(new TimeoutException()); @@ -164,6 +168,7 @@ public Throwable blockingGetError(long timeout, TimeUnit unit) { public boolean blockingAwait(long timeout, TimeUnit unit) { if (getCount() != 0) { try { + BlockingHelper.verifyNonBlocking(); if (!await(timeout, unit)) { dispose(); return false; diff --git a/src/main/java/io/reactivex/internal/observers/FutureObserver.java b/src/main/java/io/reactivex/internal/observers/FutureObserver.java index caa9f14ed4..48f6ef0971 100644 --- a/src/main/java/io/reactivex/internal/observers/FutureObserver.java +++ b/src/main/java/io/reactivex/internal/observers/FutureObserver.java @@ -20,6 +20,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.util.BlockingHelper; import io.reactivex.plugins.RxJavaPlugins; /** @@ -72,6 +73,7 @@ public boolean isDone() { @Override public T get() throws InterruptedException, ExecutionException { if (getCount() != 0) { + BlockingHelper.verifyNonBlocking(); await(); } @@ -88,6 +90,7 @@ public T get() throws InterruptedException, ExecutionException { @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (getCount() != 0) { + BlockingHelper.verifyNonBlocking(); if (!await(timeout, unit)) { throw new TimeoutException(); } diff --git a/src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java b/src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java index ae1f123328..33b5b8099a 100644 --- a/src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java +++ b/src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java @@ -19,6 +19,7 @@ import io.reactivex.SingleObserver; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.util.BlockingHelper; import io.reactivex.plugins.RxJavaPlugins; /** @@ -71,6 +72,7 @@ public boolean isDone() { @Override public T get() throws InterruptedException, ExecutionException { if (getCount() != 0) { + BlockingHelper.verifyNonBlocking(); await(); } @@ -87,6 +89,7 @@ public T get() throws InterruptedException, ExecutionException { @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (getCount() != 0) { + BlockingHelper.verifyNonBlocking(); if (!await(timeout, unit)) { throw new TimeoutException(); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java index f2768291b4..e592436830 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java @@ -23,7 +23,7 @@ import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.queue.SpscArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; public final class BlockingFlowableIterable implements Iterable { final Publisher source; @@ -86,6 +86,7 @@ public boolean hasNext() { } } if (empty) { + BlockingHelper.verifyNonBlocking(); lock.lock(); try { while (!done && queue.isEmpty()) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java index 1f4e7edbb3..55ef6c453c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableLatest.java @@ -20,7 +20,7 @@ import org.reactivestreams.Publisher; import io.reactivex.*; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.DisposableSubscriber; @@ -79,6 +79,7 @@ public boolean hasNext() { if (iteratorNotification == null || iteratorNotification.isOnNext()) { if (iteratorNotification == null) { try { + BlockingHelper.verifyNonBlocking(); notify.acquire(); } catch (InterruptedException ex) { dispose(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java index 33c7baf061..6c505fd50a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableNext.java @@ -20,7 +20,7 @@ import org.reactivestreams.Publisher; import io.reactivex.*; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.DisposableSubscriber; @@ -165,6 +165,7 @@ public void onNext(Notification args) { public Notification takeNext() throws InterruptedException { setWaiting(); + BlockingHelper.verifyNonBlocking(); return buf.take(); } void setWaiting() { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java index 7553a1d982..24ece91821 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBlockingSubscribe.java @@ -57,6 +57,7 @@ public static void subscribe(Publisher o, Subscriber if (bs.isCancelled()) { break; } + BlockingHelper.verifyNonBlocking(); v = queue.take(); } if (bs.isCancelled()) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java index 00c613ad2e..a9d2e92cb3 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java @@ -21,7 +21,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.queue.SpscLinkedArrayQueue; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; public final class BlockingObservableIterable implements Iterable { final ObservableSource source; @@ -78,6 +78,7 @@ public boolean hasNext() { } if (empty) { try { + BlockingHelper.verifyNonBlocking(); lock.lock(); try { while (!done && queue.isEmpty()) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java index 7fb6ad5791..471e695fb7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableLatest.java @@ -19,7 +19,7 @@ import io.reactivex.*; import io.reactivex.Observable; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; import io.reactivex.observers.DisposableObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -79,6 +79,7 @@ public boolean hasNext() { } if (iteratorNotification == null) { try { + BlockingHelper.verifyNonBlocking(); notify.acquire(); } catch (InterruptedException ex) { dispose(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java index 9037c1368e..d0b3e13555 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableNext.java @@ -18,7 +18,7 @@ import java.util.concurrent.atomic.AtomicInteger; import io.reactivex.*; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; import io.reactivex.observers.DisposableObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -162,6 +162,7 @@ public void onNext(Notification args) { public Notification takeNext() throws InterruptedException { setWaiting(); + BlockingHelper.verifyNonBlocking(); return buf.take(); } void setWaiting() { diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index 3f91e665b2..a74439fc0f 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -56,7 +56,7 @@ public final class ComputationScheduler extends Scheduler { int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY))); - THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); + THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority, true); NONE = new FixedSchedulerPool(0, THREAD_FACTORY); NONE.shutdown(); diff --git a/src/main/java/io/reactivex/internal/schedulers/NonBlockingThread.java b/src/main/java/io/reactivex/internal/schedulers/NonBlockingThread.java new file mode 100644 index 0000000000..cc54aad8a3 --- /dev/null +++ b/src/main/java/io/reactivex/internal/schedulers/NonBlockingThread.java @@ -0,0 +1,22 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +/** + * Marker interface to indicate blocking is not recommended while running + * on a Scheduler with a thread type implementing it. + */ +public interface NonBlockingThread { + +} diff --git a/src/main/java/io/reactivex/internal/schedulers/RxThreadFactory.java b/src/main/java/io/reactivex/internal/schedulers/RxThreadFactory.java index db4f0d6711..45b924ea80 100644 --- a/src/main/java/io/reactivex/internal/schedulers/RxThreadFactory.java +++ b/src/main/java/io/reactivex/internal/schedulers/RxThreadFactory.java @@ -28,15 +28,22 @@ public final class RxThreadFactory extends AtomicLong implements ThreadFactory { final int priority; + final boolean nonBlocking; + // static volatile boolean CREATE_TRACE; public RxThreadFactory(String prefix) { - this(prefix, Thread.NORM_PRIORITY); + this(prefix, Thread.NORM_PRIORITY, false); } public RxThreadFactory(String prefix, int priority) { + this(prefix, priority, false); + } + + public RxThreadFactory(String prefix, int priority, boolean nonBlocking) { this.prefix = prefix; this.priority = priority; + this.nonBlocking = nonBlocking; } @Override @@ -63,7 +70,8 @@ public Thread newThread(Runnable r) { // } // } - Thread t = new Thread(r, nameBuilder.toString()); + String name = nameBuilder.toString(); + Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name); t.setPriority(priority); t.setDaemon(true); return t; @@ -73,4 +81,10 @@ public Thread newThread(Runnable r) { public String toString() { return "RxThreadFactory[" + prefix + "]"; } + + static final class RxCustomThread extends Thread implements NonBlockingThread { + RxCustomThread(Runnable run, String name) { + super(run, name); + } + } } diff --git a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java index f082921164..5e0e46cc39 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java @@ -44,7 +44,7 @@ public final class SingleScheduler extends Scheduler { int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, Integer.getInteger(KEY_SINGLE_PRIORITY, Thread.NORM_PRIORITY))); - SINGLE_THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); + SINGLE_THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority, true); } public SingleScheduler() { diff --git a/src/main/java/io/reactivex/internal/subscribers/BlockingBaseSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/BlockingBaseSubscriber.java index 7b01b4a2c2..33023306c6 100644 --- a/src/main/java/io/reactivex/internal/subscribers/BlockingBaseSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/BlockingBaseSubscriber.java @@ -17,7 +17,7 @@ import org.reactivestreams.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.ExceptionHelper; +import io.reactivex.internal.util.*; public abstract class BlockingBaseSubscriber extends CountDownLatch implements Subscriber { @@ -60,6 +60,7 @@ public final void onComplete() { public final T blockingGet() { if (getCount() != 0) { try { + BlockingHelper.verifyNonBlocking(); await(); } catch (InterruptedException ex) { Subscription s = this.s; diff --git a/src/main/java/io/reactivex/internal/subscribers/FutureSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/FutureSubscriber.java index 6bccfea3de..f6b0760540 100644 --- a/src/main/java/io/reactivex/internal/subscribers/FutureSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/FutureSubscriber.java @@ -20,6 +20,7 @@ import org.reactivestreams.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BlockingHelper; import io.reactivex.plugins.RxJavaPlugins; /** @@ -72,6 +73,7 @@ public boolean isDone() { @Override public T get() throws InterruptedException, ExecutionException { if (getCount() != 0) { + BlockingHelper.verifyNonBlocking(); await(); } @@ -88,6 +90,7 @@ public T get() throws InterruptedException, ExecutionException { @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (getCount() != 0) { + BlockingHelper.verifyNonBlocking(); if (!await(timeout, unit)) { throw new TimeoutException(); } diff --git a/src/main/java/io/reactivex/internal/util/BlockingHelper.java b/src/main/java/io/reactivex/internal/util/BlockingHelper.java index 3f507a2388..39e75e9689 100644 --- a/src/main/java/io/reactivex/internal/util/BlockingHelper.java +++ b/src/main/java/io/reactivex/internal/util/BlockingHelper.java @@ -16,6 +16,8 @@ import java.util.concurrent.CountDownLatch; import io.reactivex.disposables.Disposable; +import io.reactivex.internal.schedulers.NonBlockingThread; +import io.reactivex.plugins.RxJavaPlugins; /** * Utility methods for helping common blocking operations. @@ -34,6 +36,7 @@ public static void awaitForComplete(CountDownLatch latch, Disposable subscriptio } // block until the subscription completes and then return try { + verifyNonBlocking(); latch.await(); } catch (InterruptedException e) { subscription.dispose(); @@ -45,5 +48,15 @@ public static void awaitForComplete(CountDownLatch latch, Disposable subscriptio } } - + /** + * Checks if the {@code failOnNonBlockingScheduler} plugin setting is enabled and the current + * thread is a Scheduler sensitive to blocking operators. + * @throws IllegalStateException if the {@code failOnNonBlockingScheduler} and the current thread is sensitive to blocking + */ + public static void verifyNonBlocking() { + if (RxJavaPlugins.isFailOnNonBlockingScheduler() + && Thread.currentThread() instanceof NonBlockingThread) { + throw new IllegalStateException("Attempt to block on a Scheduler " + Thread.currentThread().getName() + " that doesn't support blocking operators as they may lead to deadlock"); + } + } } diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 8fe57c8405..44f8a14671 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -88,6 +88,12 @@ public final class RxJavaPlugins { /** Prevents changing the plugins. */ static volatile boolean lockdown; + /** + * If true, attempting to run a blockingX operation on a (by default) + * computation or single scheduler will throw an IllegalStateException. + */ + static volatile boolean failNonBlockingScheduler; + /** * Prevents changing the plugins from then on. *

This allows container-like environments to prevent clients @@ -105,6 +111,31 @@ public static boolean isLockdown() { return lockdown; } + /** + * Enables or disables the blockingX operators to fail on a non-blocking + * scheduler such as computation or single. + * @param enable enable or disable the feature + * @since 2.0.5 - experimental + */ + @Experimental + public static void setFailOnNonBlockingScheduler(boolean enable) { + if (lockdown) { + throw new IllegalStateException("Plugins can't be changed anymore"); + } + failNonBlockingScheduler = enable; + } + + /** + * Returns true if the blockingX operators fail on a non-blocking scheduler + * such as computation or single. + * @return true if the blockingX operators fail on a non-blocking scheduler + * @since 2.0.5 - experimental + */ + @Experimental + public static boolean isFailOnNonBlockingScheduler() { + return failNonBlockingScheduler; + } + /** * Returns the current hook function. * @return the hook function, may be null @@ -378,6 +409,8 @@ public static void reset() { setOnMaybeAssembly(null); setOnMaybeSubscribe(null); + + setFailOnNonBlockingScheduler(false); } /** diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index 8d6da37045..10b719ec72 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -226,11 +226,20 @@ public Object apply(Object t1, Object t2) { for (Method m : RxJavaPlugins.class.getMethods()) { if (m.getName().startsWith("set")) { - Method getter = RxJavaPlugins.class.getMethod("get" + m.getName().substring(3)); + Method getter; + + if (m.getParameterTypes()[0] == Boolean.TYPE) { + getter = RxJavaPlugins.class.getMethod("is" + m.getName().substring(3)); + } else { + getter = RxJavaPlugins.class.getMethod("get" + m.getName().substring(3)); + } Object before = getter.invoke(null); try { + if (m.getParameterTypes()[0].isAssignableFrom(Boolean.TYPE)) { + m.invoke(null, true); + } else if (m.getParameterTypes()[0].isAssignableFrom(Callable.class)) { m.invoke(null, f0); } else @@ -253,7 +262,11 @@ public Object apply(Object t1, Object t2) { Object after = getter.invoke(null); - assertSame(m.toString(), before, after); + if (m.getParameterTypes()[0].isPrimitive()) { + assertEquals(m.toString(), before, after); + } else { + assertSame(m.toString(), before, after); + } } } @@ -1918,7 +1931,7 @@ public void run() { } @Test - public void testCreateComputationScheduler() { + public void createComputationScheduler() { final String name = "ComputationSchedulerTest"; ThreadFactory factory = new ThreadFactory() { @Override @@ -1944,7 +1957,7 @@ public Scheduler apply(Scheduler scheduler) throws Exception { } @Test - public void testCreateIoScheduler() { + public void createIoScheduler() { final String name = "IoSchedulerTest"; ThreadFactory factory = new ThreadFactory() { @Override @@ -1970,7 +1983,7 @@ public Scheduler apply(Scheduler scheduler) throws Exception { } @Test - public void testCreateNewThreadScheduler() { + public void createNewThreadScheduler() { final String name = "NewThreadSchedulerTest"; ThreadFactory factory = new ThreadFactory() { @Override @@ -1996,7 +2009,7 @@ public Scheduler apply(Scheduler scheduler) throws Exception { } @Test - public void testCreateSingleScheduler() { + public void createSingleScheduler() { final String name = "SingleSchedulerTest"; ThreadFactory factory = new ThreadFactory() { @Override diff --git a/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java b/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java new file mode 100644 index 0000000000..56d8a570dc --- /dev/null +++ b/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java @@ -0,0 +1,642 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.schedulers; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; + +public class FailOnBlockingTest { + + @Test + public void failComputationFlowableBlockingFirst() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingFirst(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationFlowableBlockingLast() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingLast(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationFlowableBlockingIterable() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingIterable().iterator().next(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationFlowableBlockingSubscribe() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingSubscribe(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationFlowableBlockingSingle() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingSingle(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationFlowableBlockingForEach() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingForEach(Functions.emptyConsumer()); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationFlowableBlockingLatest() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingLatest().iterator().hasNext(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationFlowableBlockingNext() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingNext().iterator().hasNext(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationFlowableToFuture() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).toFuture().get(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableBlockingFirst() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).blockingFirst(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableBlockingLast() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingLast(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableBlockingIterable() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).blockingIterable().iterator().next(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableBlockingSubscribe() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).blockingSubscribe(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableBlockingSingle() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).blockingSingle(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableBlockingForEach() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).blockingForEach(Functions.emptyConsumer()); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableBlockingLatest() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).blockingLatest().iterator().hasNext(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableBlockingNext() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).blockingNext().iterator().hasNext(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failComputationObservableToFuture() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.computation()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).toFuture().get(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failSingleObservableBlockingFirst() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.single()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Observable.just(1).delay(10, TimeUnit.SECONDS).blockingFirst(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failSingleSingleBlockingGet() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Single.just(1) + .subscribeOn(Schedulers.single()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Single.just(1).delay(10, TimeUnit.SECONDS).blockingGet(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failSingleMaybeBlockingGet() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Maybe.just(1) + .subscribeOn(Schedulers.single()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Maybe.just(1).delay(10, TimeUnit.SECONDS).blockingGet(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failSingleCompletableBlockingGet() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Completable.complete() + .subscribeOn(Schedulers.single()) + .doOnComplete(new Action() { + @Override + public void run() throws Exception { + Completable.complete().delay(10, TimeUnit.SECONDS).blockingGet(); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void failSingleCompletableBlockingAwait() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Completable.complete() + .subscribeOn(Schedulers.single()) + .doOnComplete(new Action() { + @Override + public void run() throws Exception { + Completable.complete().delay(10, TimeUnit.SECONDS).blockingAwait(); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + } finally { + RxJavaPlugins.reset(); + } + + } + + @Test + public void dontfailIOObservableBlockingFirst() { + + try { + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Observable.just(1) + .subscribeOn(Schedulers.io()) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return Observable.just(2).delay(100, TimeUnit.MILLISECONDS).blockingFirst(); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(2); + } finally { + RxJavaPlugins.reset(); + } + + } + +} diff --git a/src/test/java/io/reactivex/tck/DelaySubscriptionTckTest.java b/src/test/java/io/reactivex/tck/DelaySubscriptionTckTest.java index ac2e2e4df9..875dafc515 100644 --- a/src/test/java/io/reactivex/tck/DelaySubscriptionTckTest.java +++ b/src/test/java/io/reactivex/tck/DelaySubscriptionTckTest.java @@ -26,7 +26,7 @@ public class DelaySubscriptionTckTest extends BaseTck { public DelaySubscriptionTckTest() { super(200L); } - + @Override public Publisher createPublisher(long elements) { return FlowableTck.wrap( From 9e355b42c2021b06be88251e70814706869758e7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 25 Jan 2017 16:07:38 +0100 Subject: [PATCH 2/3] Increase sleep time in XFlatMapTest --- src/test/java/io/reactivex/XFlatMapTest.java | 38 ++++++++++---------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/test/java/io/reactivex/XFlatMapTest.java b/src/test/java/io/reactivex/XFlatMapTest.java index cb687974fe..5c49525fa0 100644 --- a/src/test/java/io/reactivex/XFlatMapTest.java +++ b/src/test/java/io/reactivex/XFlatMapTest.java @@ -30,6 +30,8 @@ public class XFlatMapTest { + static final int SLEEP_AFTER_CANCEL = 200; + final CyclicBarrier cb = new CyclicBarrier(2); void sleep() throws Exception { @@ -62,7 +64,7 @@ public Publisher apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -93,7 +95,7 @@ public Single apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -124,7 +126,7 @@ public Maybe apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -155,7 +157,7 @@ public Completable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -187,7 +189,7 @@ public Completable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -218,7 +220,7 @@ public Observable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -249,7 +251,7 @@ public Single apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -280,7 +282,7 @@ public Maybe apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -311,7 +313,7 @@ public Completable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -343,7 +345,7 @@ public Completable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -374,7 +376,7 @@ public Single apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -405,7 +407,7 @@ public Maybe apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -436,7 +438,7 @@ public Completable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -468,7 +470,7 @@ public Completable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -499,7 +501,7 @@ public Single apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -530,7 +532,7 @@ public Maybe apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -561,7 +563,7 @@ public Completable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); @@ -593,7 +595,7 @@ public Completable apply(Integer v) throws Exception { ts.cancel(); - Thread.sleep(150); + Thread.sleep(SLEEP_AFTER_CANCEL); ts.assertEmpty(); From ac942d2bc35f581ec7a054499fd18e2ba6907ccf Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 26 Jan 2017 14:36:00 +0100 Subject: [PATCH 3/3] Add a custom RxJavaPlugins callback onBeforeBlocking --- .../internal/util/BlockingHelper.java | 3 +- .../io/reactivex/plugins/RxJavaPlugins.java | 61 ++++++++++++++++++- .../reactivex/plugins/RxJavaPluginsTest.java | 24 ++++++-- .../schedulers/FailOnBlockingTest.java | 40 ++++++++++++ 4 files changed, 118 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/reactivex/internal/util/BlockingHelper.java b/src/main/java/io/reactivex/internal/util/BlockingHelper.java index 39e75e9689..7be3695206 100644 --- a/src/main/java/io/reactivex/internal/util/BlockingHelper.java +++ b/src/main/java/io/reactivex/internal/util/BlockingHelper.java @@ -55,7 +55,8 @@ public static void awaitForComplete(CountDownLatch latch, Disposable subscriptio */ public static void verifyNonBlocking() { if (RxJavaPlugins.isFailOnNonBlockingScheduler() - && Thread.currentThread() instanceof NonBlockingThread) { + && (Thread.currentThread() instanceof NonBlockingThread + || RxJavaPlugins.onBeforeBlocking())) { throw new IllegalStateException("Attempt to block on a Scheduler " + Thread.currentThread().getName() + " that doesn't support blocking operators as they may lead to deadlock"); } } diff --git a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java index 44f8a14671..5840536e8c 100644 --- a/src/main/java/io/reactivex/plugins/RxJavaPlugins.java +++ b/src/main/java/io/reactivex/plugins/RxJavaPlugins.java @@ -13,7 +13,7 @@ package io.reactivex.plugins; import io.reactivex.*; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.*; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; import io.reactivex.internal.functions.ObjectHelper; @@ -85,6 +85,8 @@ public final class RxJavaPlugins { static volatile BiFunction onCompletableSubscribe; + static volatile BooleanSupplier onBeforeBlocking; + /** Prevents changing the plugins. */ static volatile boolean lockdown; @@ -112,7 +114,8 @@ public static boolean isLockdown() { } /** - * Enables or disables the blockingX operators to fail on a non-blocking + * Enables or disables the blockingX operators to fail + * with an IllegalStateException on a non-blocking * scheduler such as computation or single. * @param enable enable or disable the feature * @since 2.0.5 - experimental @@ -126,7 +129,8 @@ public static void setFailOnNonBlockingScheduler(boolean enable) { } /** - * Returns true if the blockingX operators fail on a non-blocking scheduler + * Returns true if the blockingX operators fail + * with an IllegalStateException on a non-blocking scheduler * such as computation or single. * @return true if the blockingX operators fail on a non-blocking scheduler * @since 2.0.5 - experimental @@ -411,6 +415,7 @@ public static void reset() { setOnMaybeSubscribe(null); setFailOnNonBlockingScheduler(false); + setOnBeforeBlocking(null); } /** @@ -961,6 +966,56 @@ public static Completable onAssembly(Completable source) { return source; } + /** + * Called before an operator attempts a blocking operation + * such as awaiting a condition or signal + * and should return true to indicate the operator + * should not block but throw an IllegalArgumentException. + * @return true if the blocking should be prevented + * @see #setFailOnNonBlockingScheduler(boolean) + * @since 2.0.5 - experimental + */ + @Experimental + public static boolean onBeforeBlocking() { + BooleanSupplier f = onBeforeBlocking; + if (f != null) { + try { + return f.getAsBoolean(); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + return false; + } + + /** + * Set the handler that is called when an operator attemts a blocking + * await; the handler should return true to prevent the blocking + * and to signal an IllegalStateException instead. + * @param handler the handler to set, null resets to the default handler + * that always returns false + * @see #onBeforeBlocking() + * @since 2.0.5 - experimental + */ + @Experimental + public static void setOnBeforeBlocking(BooleanSupplier handler) { + if (lockdown) { + throw new IllegalStateException("Plugins can't be changed anymore"); + } + onBeforeBlocking = handler; + } + + /** + * Returns the current blocking handler or null if no custom handler + * is set. + * @return the current blocking handler or null if not specified + * @since 2.0.5 - experimental + */ + @Experimental + public static BooleanSupplier getOnBeforeBlocking() { + return onBeforeBlocking; + } + /** * Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()} * except using {@code threadFactory} for thread creation. diff --git a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java index 10b719ec72..19d2f5a308 100644 --- a/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java +++ b/src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java @@ -223,12 +223,21 @@ public Object apply(Object t1, Object t2) { } }; + BooleanSupplier bs = new BooleanSupplier() { + @Override + public boolean getAsBoolean() throws Exception { + return true; + } + }; + for (Method m : RxJavaPlugins.class.getMethods()) { if (m.getName().startsWith("set")) { Method getter; - if (m.getParameterTypes()[0] == Boolean.TYPE) { + Class paramType = m.getParameterTypes()[0]; + + if (paramType == Boolean.TYPE) { getter = RxJavaPlugins.class.getMethod("is" + m.getName().substring(3)); } else { getter = RxJavaPlugins.class.getMethod("get" + m.getName().substring(3)); @@ -237,17 +246,20 @@ public Object apply(Object t1, Object t2) { Object before = getter.invoke(null); try { - if (m.getParameterTypes()[0].isAssignableFrom(Boolean.TYPE)) { + if (paramType.isAssignableFrom(Boolean.TYPE)) { m.invoke(null, true); } else - if (m.getParameterTypes()[0].isAssignableFrom(Callable.class)) { + if (paramType.isAssignableFrom(Callable.class)) { m.invoke(null, f0); } else - if (m.getParameterTypes()[0].isAssignableFrom(Function.class)) { + if (paramType.isAssignableFrom(Function.class)) { m.invoke(null, f1); } else - if (m.getParameterTypes()[0].isAssignableFrom(Consumer.class)) { + if (paramType.isAssignableFrom(Consumer.class)) { m.invoke(null, a1); + } else + if (paramType.isAssignableFrom(BooleanSupplier.class)) { + m.invoke(null, bs); } else { m.invoke(null, f2); } @@ -262,7 +274,7 @@ public Object apply(Object t1, Object t2) { Object after = getter.invoke(null); - if (m.getParameterTypes()[0].isPrimitive()) { + if (paramType.isPrimitive()) { assertEquals(m.toString(), before, after); } else { assertSame(m.toString(), before, after); diff --git a/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java b/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java index 56d8a570dc..aebcf6c307 100644 --- a/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java +++ b/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java @@ -639,4 +639,44 @@ public Integer apply(Integer v) throws Exception { } + @Test + public void failWithCustomHandler() { + try { + RxJavaPlugins.setOnBeforeBlocking(new BooleanSupplier() { + @Override + public boolean getAsBoolean() throws Exception { + return true; + } + }); + RxJavaPlugins.setFailOnNonBlockingScheduler(true); + + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + + Flowable.just(1).delay(10, TimeUnit.SECONDS).blockingLast(); + + return v; + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(IllegalStateException.class); + + } finally { + RxJavaPlugins.reset(); + } + + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + return Flowable.just(2).delay(100, TimeUnit.MILLISECONDS).blockingLast(); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(2); + } }