diff --git a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java index 91a5440227..d4f47b1db1 100644 --- a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java +++ b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java @@ -91,7 +91,7 @@ public Worker createWorker() { */ public Subscription scheduleDirect(Action0 action) { PoolWorker pw = pool.getEventLoop(); - return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS); + return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS, false); } private static class EventLoopWorker extends Scheduler.Worker { @@ -124,7 +124,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { return Subscriptions.unsubscribed(); } - ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit); + ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, false); innerSubscription.add(s); s.addParent(innerSubscription); return s; diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index 41144795cb..522754be7d 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -151,19 +151,20 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit if (isUnsubscribed) { return Subscriptions.unsubscribed(); } - return scheduleActual(action, delayTime, unit); + return scheduleActual(action, delayTime, unit, true); } /** - * @warn javadoc missing - * @param action - * @param delayTime - * @param unit - * @return + * Performs the actual scheduling of a potentially delayed task and assigns the + * future to the ScheduledAction it returs. + * @param action the action to schedule + * @param delayTime the scheduling delay if positive + * @param unit the scheduling delay's time unit + * @return the ScheduledAction representing the task */ - public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { + public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, boolean interruptOnUnsubscribe) { Action0 decoratedAction = schedulersHook.onSchedule(action); - ScheduledAction run = new ScheduledAction(decoratedAction); + ScheduledAction run = new ScheduledAction(decoratedAction, interruptOnUnsubscribe); Future f; if (delayTime <= 0) { f = executor.submit(run); diff --git a/src/main/java/rx/internal/schedulers/ScheduledAction.java b/src/main/java/rx/internal/schedulers/ScheduledAction.java index 24240096c9..c283d93b41 100644 --- a/src/main/java/rx/internal/schedulers/ScheduledAction.java +++ b/src/main/java/rx/internal/schedulers/ScheduledAction.java @@ -16,12 +16,12 @@ package rx.internal.schedulers; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import rx.Subscription; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; +import rx.internal.util.SubscriptionList; import rx.plugins.RxJavaPlugins; import rx.subscriptions.CompositeSubscription; @@ -32,12 +32,20 @@ public final class ScheduledAction extends AtomicReference implements Runnable, Subscription { /** */ private static final long serialVersionUID = -3962399486978279857L; - final CompositeSubscription cancel; + final SubscriptionList cancel; final Action0 action; + volatile int interruptOnUnsubscribe; + static final AtomicIntegerFieldUpdater INTERRUPT_ON_UNSUBSCRIBE + = AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "interruptOnUnsubscribe"); public ScheduledAction(Action0 action) { + this(action, true); + } + + public ScheduledAction(Action0 action, boolean interruptOnUnsubscribe) { this.action = action; - this.cancel = new CompositeSubscription(); + this.cancel = new SubscriptionList(); + this.interruptOnUnsubscribe = interruptOnUnsubscribe ? 1 : 0; } @Override @@ -61,6 +69,21 @@ public void run() { } } + /** + * Sets the flag to indicate the underlying Future task should be interrupted on unsubscription or not. + * @param interrupt the new interruptible status + */ + public void setInterruptOnUnsubscribe(boolean interrupt) { + INTERRUPT_ON_UNSUBSCRIBE.lazySet(this, interrupt ? 1 : 0); + } + /** + * Returns {@code true} if the underlying Future task will be interrupted on unsubscription. + * @return the current interruptible status + */ + public boolean isInterruptOnUnsubscribe() { + return interruptOnUnsubscribe != 0; + } + @Override public boolean isUnsubscribed() { return cancel.isUnsubscribed(); @@ -68,9 +91,7 @@ public boolean isUnsubscribed() { @Override public void unsubscribe() { - if (!cancel.isUnsubscribed()) { - cancel.unsubscribe(); - } + cancel.unsubscribe(); } /** @@ -89,7 +110,7 @@ public void add(Subscription s) { * @param f the future to add */ public void add(final Future f) { - cancel.add(new FutureCompleter(f)); + add(new FutureCompleter(f)); } /** @@ -100,7 +121,7 @@ public void add(final Future f) { * the parent {@code CompositeSubscription} to add */ public void addParent(CompositeSubscription parent) { - cancel.add(new Remover(this, parent)); + add(new Remover(this, parent)); } /** @@ -119,7 +140,7 @@ private FutureCompleter(Future f) { @Override public void unsubscribe() { if (ScheduledAction.this.get() != Thread.currentThread()) { - f.cancel(true); + f.cancel(interruptOnUnsubscribe != 0); } else { f.cancel(false); } @@ -155,4 +176,4 @@ public void unsubscribe() { } } -} +} \ No newline at end of file diff --git a/src/main/java/rx/schedulers/CachedThreadScheduler.java b/src/main/java/rx/schedulers/CachedThreadScheduler.java index f1cd815b64..7d706bceae 100644 --- a/src/main/java/rx/schedulers/CachedThreadScheduler.java +++ b/src/main/java/rx/schedulers/CachedThreadScheduler.java @@ -145,7 +145,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { return Subscriptions.unsubscribed(); } - ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit); + ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit, true); innerSubscription.add(s); s.addParent(innerSubscription); return s; diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index 8ded001e0e..bf5ab8af44 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -15,11 +15,14 @@ */ package rx.schedulers; -import rx.Scheduler; -import rx.internal.schedulers.EventLoopsScheduler; -import rx.plugins.RxJavaPlugins; +import java.util.concurrent.*; -import java.util.concurrent.Executor; +import rx.*; +import rx.annotations.Experimental; +import rx.functions.Action0; +import rx.internal.schedulers.*; +import rx.plugins.RxJavaPlugins; +import rx.subscriptions.CompositeSubscription; /** * Static factory methods for creating Schedulers. @@ -137,4 +140,53 @@ public static TestScheduler test() { public static Scheduler from(Executor executor) { return new ExecutorScheduler(executor); } -} + /** + * Submit an Action0 to the specified executor service with the option to interrupt the task + * on unsubscription and add it to a parent composite subscription. + * @param executor the target executor service + * @param action the action to execute + * @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it + * once the action completes or is unsubscribed. + * @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running + * @return the Subscription representing the scheduled action which is also added to the {@code parent} composite + */ + @Experimental + public static Subscription submitTo(ExecutorService executor, Action0 action, CompositeSubscription parent, boolean interruptOnUnsubscribe) { + ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe); + + if (parent != null) { + parent.add(sa); + sa.addParent(parent); + } + + Future f = executor.submit(sa); + sa.add(f); + + return sa; + } + /** + * Submit an Action0 to the specified executor service with the given delay and the option to interrupt the task + * on unsubscription and add it to a parent composite subscription. + * @param executor the target executor service + * @param action the action to execute + * @param delay the delay value + * @param unit the time unit of the delay value + * @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it + * once the action completes or is unsubscribed. + * @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running + * @return the Subscription representing the scheduled action which is also added to the {@code parent} composite + */ + @Experimental + public static Subscription submitTo(ScheduledExecutorService executor, Action0 action, long delay, TimeUnit unit, CompositeSubscription parent, boolean interruptOnUnsubscribe) { + ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe); + + if (parent != null) { + parent.add(sa); + sa.addParent(parent); + } + + Future f = executor.schedule(sa, delay, unit); + sa.add(f); + + return sa; + }} diff --git a/src/test/java/rx/schedulers/SchedulersTest.java b/src/test/java/rx/schedulers/SchedulersTest.java new file mode 100644 index 0000000000..76b854863f --- /dev/null +++ b/src/test/java/rx/schedulers/SchedulersTest.java @@ -0,0 +1,323 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import rx.Subscription; +import rx.functions.Action0; +import rx.subscriptions.CompositeSubscription; + +public class SchedulersTest { + static final class RunAction extends AtomicBoolean implements Action0 { + /** */ + private static final long serialVersionUID = -3148738938700490457L; + private CountDownLatch startLatch = new CountDownLatch(1); + private CountDownLatch runLatch = new CountDownLatch(1); + private CountDownLatch completeLatch = new CountDownLatch(1); + private volatile boolean waitInterrupted; + @Override + public void call() { + startLatch.countDown(); + try { + runLatch.await(); + } catch (InterruptedException ex) { + waitInterrupted = true; + completeLatch.countDown(); + return; + } + lazySet(true); + completeLatch.countDown(); + } + private void await(CountDownLatch latch) { + try { + latch.await(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + public void awaitStart() { + await(startLatch); + } + public void awaitComplete() { + await(completeLatch); + } + public boolean isWaitInterrupted() { + return waitInterrupted; + } + public void run() { + runLatch.countDown(); + } + } + @Test + public void submitToSimpleInterrupt() { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, csub, true); + + ra.awaitStart(); + + csub.remove(s); + + ra.awaitComplete(); + + assertTrue(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + + @Test + public void submitToSimpleNoInterrupt() { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, csub, false); + + ra.awaitStart(); + + csub.remove(s); + + ra.run(); + + ra.awaitComplete(); + + assertFalse(ra.isWaitInterrupted()); + assertTrue(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test + public void submitToSimpleInterruptNoParent() { + RunAction ra = new RunAction(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, null, true); + + ra.awaitStart(); + + s.unsubscribe(); + + ra.awaitComplete(); + + assertTrue(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + + @Test + public void submitToSimpleNoInterruptNoParent() { + RunAction ra = new RunAction(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, null, false); + + ra.awaitStart(); + + s.unsubscribe(); + + ra.run(); + + ra.awaitComplete(); + + assertFalse(ra.isWaitInterrupted()); + assertTrue(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test + public void submitToDelayedSimpleInterrupt() { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, 500, TimeUnit.MILLISECONDS, csub, true); + + ra.awaitStart(); + + csub.remove(s); + + ra.awaitComplete(); + + assertTrue(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test(timeout = 3000) + public void submitToDelayedSimpleInterruptBeforeRun() throws InterruptedException { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, 1000, TimeUnit.MILLISECONDS, csub, true); + + Thread.sleep(500); + + csub.remove(s); + + Thread.sleep(1000); + + assertFalse(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + } finally { + exec.shutdownNow(); + } + + } + + @Test + public void submitToDelayedSimpleNoInterrupt() { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + + try { + Subscription s = Schedulers.submitTo(exec, ra, 500, TimeUnit.MILLISECONDS, csub, false); + + ra.awaitStart(); + + csub.remove(s); + + ra.run(); + + ra.awaitComplete(); + + assertFalse(ra.isWaitInterrupted()); + assertTrue(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test + public void submitToDelayedSimpleInterruptNoParent() { + RunAction ra = new RunAction(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, 500, TimeUnit.MILLISECONDS, null, true); + + ra.awaitStart(); + + s.unsubscribe(); + + ra.awaitComplete(); + + assertTrue(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test(timeout = 3000) + public void submitToDelayedSimpleInterruptBeforeRunNoParent() throws InterruptedException { + RunAction ra = new RunAction(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, 1000, TimeUnit.MILLISECONDS, null, true); + + Thread.sleep(500); + + s.unsubscribe(); + + Thread.sleep(1000); + + assertFalse(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + } finally { + exec.shutdownNow(); + } + + } + + @Test + public void submitToDelayedSimpleNoInterruptNoParent() { + RunAction ra = new RunAction(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + + try { + Subscription s = Schedulers.submitTo(exec, ra, 500, TimeUnit.MILLISECONDS, null, false); + + ra.awaitStart(); + + s.unsubscribe(); + + ra.run(); + + ra.awaitComplete(); + + assertFalse(ra.isWaitInterrupted()); + assertTrue(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + +} \ No newline at end of file diff --git a/src/test/java/rx/test/TestObstructionDetectionTest.java b/src/test/java/rx/test/TestObstructionDetectionTest.java index f6db1db597..1c4e8adc6f 100644 --- a/src/test/java/rx/test/TestObstructionDetectionTest.java +++ b/src/test/java/rx/test/TestObstructionDetectionTest.java @@ -15,6 +15,8 @@ */ package rx.test; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.*; import rx.*; @@ -25,10 +27,6 @@ public class TestObstructionDetectionTest { private static Scheduler.Worker w; - @org.junit.After - public void doAfterTest() { - rx.test.TestObstructionDetection.checkObstruction(); - } @AfterClass public static void afterClass() { Worker w2 = w; @@ -36,14 +34,20 @@ public static void afterClass() { w2.unsubscribe(); } } + @After + public void after() { + TestObstructionDetection.checkObstruction(); + } @Test(timeout = 10000, expected = ObstructionException.class) public void testObstruction() { Scheduler.Worker w = Schedulers.computation().createWorker(); + final AtomicReference t = new AtomicReference(); try { w.schedule(new Action0() { @Override public void call() { + t.set(Thread.currentThread()); try { Thread.sleep(5000); } catch (InterruptedException ex) { @@ -53,6 +57,10 @@ public void call() { }); TestObstructionDetection.checkObstruction(); } finally { + Thread t0 = t.get(); + if (t0 != null) { + t0.interrupt(); + } w.unsubscribe(); } } @@ -70,5 +78,6 @@ public void call() { } } }); + rx.test.TestObstructionDetection.checkObstruction(); } }