underlying) {
+ this.state = state;
this.underlying = underlying;
}
@Override
- public Subscription call() {
+ public Subscription call(Scheduler scheduler) {
if (ready.compareAndSet(true, false)) {
- Subscription subscription = underlying.call();
+ Subscription subscription = underlying.call(scheduler, state);
wrapper.wrap(subscription);
return subscription;
}
@@ -49,4 +53,5 @@ public void unsubscribe() {
ready.set(false);
wrapper.unsubscribe();
}
+
}
diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java
index 133f772889..ed40bec350 100644
--- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java
+++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java
@@ -16,52 +16,26 @@
package rx.concurrency;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import rx.Scheduler;
import rx.Subscription;
-import rx.util.functions.Func0;
+import rx.subscriptions.CompositeSubscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Func2;
/**
* A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation.
*
* Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events.
*/
-public class ExecutorScheduler extends AbstractScheduler {
+public class ExecutorScheduler extends Scheduler {
private final Executor executor;
- /**
- * Setup a ScheduledExecutorService that we can use if someone provides an Executor instead of ScheduledExecutorService.
- */
- private final static ScheduledExecutorService SYSTEM_SCHEDULED_EXECUTOR;
- static {
- int count = Runtime.getRuntime().availableProcessors();
- if (count > 8) {
- count = count / 2;
- }
- // we don't need more than 8 to handle just scheduling and doing no work
- if (count > 8) {
- count = 8;
- }
- SYSTEM_SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(count, new ThreadFactory() {
-
- final AtomicInteger counter = new AtomicInteger();
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "RxScheduledExecutorPool-" + counter.incrementAndGet());
- t.setDaemon(true);
- return t;
- }
-
- });
-
- }
-
public ExecutorScheduler(Executor executor) {
this.executor = executor;
}
@@ -71,55 +45,81 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
}
@Override
- public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) {
- final DiscardableAction discardableAction = new DiscardableAction(action);
+ public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) {
+ final DiscardableAction discardableAction = new DiscardableAction(state, action);
+ final Scheduler _scheduler = this;
+ // all subscriptions that may need to be unsubscribed
+ final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
if (executor instanceof ScheduledExecutorService) {
- ((ScheduledExecutorService) executor).schedule(new Runnable() {
+ // we are a ScheduledExecutorService so can do proper scheduling
+ ScheduledFuture> f = ((ScheduledExecutorService) executor).schedule(new Runnable() {
@Override
public void run() {
- discardableAction.call();
+ // when the delay has passed we now do the work on the actual scheduler
+ Subscription s = discardableAction.call(_scheduler);
+ // add the subscription to the CompositeSubscription so it is unsubscribed
+ subscription.add(s);
}
- }, dueTime, unit);
+ }, delayTime, unit);
+ // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
+ subscription.add(Subscriptions.create(f));
} else {
- if (dueTime == 0) {
+ // we are not a ScheduledExecutorService so can't directly schedule
+ if (delayTime == 0) {
// no delay so put on the thread-pool right now
- return (schedule(action));
+ Subscription s = schedule(state, action);
+ // add the subscription to the CompositeSubscription so it is unsubscribed
+ subscription.add(s);
} else {
// there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService
// to handle the scheduling and once it's ready then execute on this Executor
- SYSTEM_SCHEDULED_EXECUTOR.schedule(new Runnable() {
+ ScheduledFuture> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
@Override
public void run() {
- // now execute on the real Executor
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- discardableAction.call();
- }
-
- });
+ // now execute on the real Executor (by using the other overload that schedules for immediate execution)
+ Subscription s = _scheduler.schedule(state, action);
+ // add the subscription to the CompositeSubscription so it is unsubscribed
+ subscription.add(s);
}
- }, dueTime, unit);
+ }, delayTime, unit);
+ // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
+ subscription.add(Subscriptions.create(f));
}
}
- return discardableAction;
+ return subscription;
}
@Override
- public Subscription schedule(Func0 action) {
- final DiscardableAction discardableAction = new DiscardableAction(action);
-
- executor.execute(new Runnable() {
+ public Subscription schedule(T state, Func2 action) {
+ final DiscardableAction discardableAction = new DiscardableAction(state, action);
+ final Scheduler _scheduler = this;
+ // all subscriptions that may need to be unsubscribed
+ final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
+
+ // work to be done on a thread
+ Runnable r = new Runnable() {
@Override
public void run() {
- discardableAction.call();
+ Subscription s = discardableAction.call(_scheduler);
+ // add the subscription to the CompositeSubscription so it is unsubscribed
+ subscription.add(s);
}
- });
+ };
+
+ // submit for immediate execution
+ if (executor instanceof ExecutorService) {
+ // we are an ExecutorService so get a Future back that supports unsubscribe
+ Future> f = ((ExecutorService) executor).submit(r);
+ // add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens
+ subscription.add(Subscriptions.create(f));
+ } else {
+ // we are the lowest common denominator so can't unsubscribe once we execute
+ executor.execute(r);
+ }
- return discardableAction;
+ return subscription;
}
diff --git a/rxjava-core/src/main/java/rx/concurrency/GenericScheduledExecutorService.java b/rxjava-core/src/main/java/rx/concurrency/GenericScheduledExecutorService.java
new file mode 100644
index 0000000000..8bb520d9fe
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/concurrency/GenericScheduledExecutorService.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.concurrency;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import rx.Scheduler;
+
+/**
+ * A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
+ *
+ * For example if a {@link Scheduler} is given an {@link Executor} or {{@link ExecutorService} instead of {@link ScheduledExecutorService}.
+ *
+ * NOTE: No actual work should be done on tasks submitted to this executor. Submit a task with the appropriate delay which then in turn invokes
+ * the work asynchronously on the appropriate {@link Scheduler} implementation. This means for example that you would not use this approach
+ * along with {@link CurrentThreadScheduler} or {@link ImmediateScheduler}.
+ */
+/* package */class GenericScheduledExecutorService {
+
+ private final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();
+ private final ScheduledExecutorService executor;
+
+ private GenericScheduledExecutorService() {
+ int count = Runtime.getRuntime().availableProcessors();
+ if (count > 8) {
+ count = count / 2;
+ }
+ // we don't need more than 8 to handle just scheduling and doing no work
+ if (count > 8) {
+ count = 8;
+ }
+ executor = Executors.newScheduledThreadPool(count, new ThreadFactory() {
+
+ final AtomicInteger counter = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "RxScheduledExecutorPool-" + counter.incrementAndGet());
+ t.setDaemon(true);
+ return t;
+ }
+
+ });
+ }
+
+ /**
+ * See class Javadoc for information on what this is for and how to use.
+ *
+ * @return {@link ScheduledExecutorService} for generic use.
+ */
+ public static ScheduledExecutorService getInstance() {
+ return INSTANCE.executor;
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java
index 10a2e33b6f..f24235ac72 100644
--- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java
+++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java
@@ -22,14 +22,15 @@
import org.junit.Test;
import org.mockito.InOrder;
+import rx.Scheduler;
import rx.Subscription;
import rx.util.functions.Action0;
-import rx.util.functions.Func0;
+import rx.util.functions.Func2;
/**
* Executes work immediately on the current thread.
*/
-public final class ImmediateScheduler extends AbstractScheduler {
+public final class ImmediateScheduler extends Scheduler {
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
public static ImmediateScheduler getInstance() {
@@ -40,13 +41,14 @@ private ImmediateScheduler() {
}
@Override
- public Subscription schedule(Func0 action) {
- return action.call();
+ public Subscription schedule(T state, Func2 action) {
+ return action.call(this, state);
}
@Override
- public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) {
- return schedule(new SleepingAction(action, this, dueTime, unit));
+ public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) {
+ // since we are executing immediately on this thread we must cause this thread to sleep
+ return schedule(state, new SleepingAction(action, this, dueTime, unit));
}
public static class UnitTest {
diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java
index 6dfedeb08e..b45bdd080d 100644
--- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java
+++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java
@@ -15,15 +15,20 @@
*/
package rx.concurrency;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import rx.Scheduler;
import rx.Subscription;
-import rx.util.functions.Func0;
+import rx.subscriptions.CompositeSubscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.AtomicObservableSubscription;
+import rx.util.functions.Func2;
/**
* Schedules work on a new thread.
*/
-public class NewThreadScheduler extends AbstractScheduler {
+public class NewThreadScheduler extends Scheduler {
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
public static NewThreadScheduler getInstance() {
@@ -31,24 +36,44 @@ public static NewThreadScheduler getInstance() {
}
@Override
- public Subscription schedule(Func0 action) {
- final DiscardableAction discardableAction = new DiscardableAction(action);
+ public Subscription schedule(final T state, final Func2 action) {
+ final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
+ final Scheduler _scheduler = this;
Thread t = new Thread(new Runnable() {
@Override
public void run() {
- discardableAction.call();
+ subscription.wrap(action.call(_scheduler, state));
}
}, "RxNewThreadScheduler");
t.start();
- return discardableAction;
+ return subscription;
}
@Override
- public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) {
- return schedule(new SleepingAction(action, this, dueTime, unit));
- }
+ public Subscription schedule(final T state, final Func2 action, long delay, TimeUnit unit) {
+ // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
+ // we will instead schedule the event then launch the thread after the delay has passed
+ final Scheduler _scheduler = this;
+ final CompositeSubscription subscription = new CompositeSubscription();
+ ScheduledFuture> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (!subscription.isUnsubscribed()) {
+ // when the delay has passed we now do the work on the actual scheduler
+ Subscription s = _scheduler.schedule(state, action);
+ // add the subscription to the CompositeSubscription so it is unsubscribed
+ subscription.add(s);
+ }
+ }
+ }, delay, unit);
+
+ // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
+ subscription.add(Subscriptions.create(f));
+
+ return subscription;
+ }
}
diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java
index a57fd9046d..db7ece2469 100644
--- a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java
+++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java
@@ -19,21 +19,21 @@
import rx.Scheduler;
import rx.Subscription;
-import rx.util.functions.Func0;
+import rx.util.functions.Func2;
-/* package */class SleepingAction implements Func0 {
- private final Func0 underlying;
+/* package */class SleepingAction implements Func2 {
+ private final Func2 underlying;
private final Scheduler scheduler;
private final long execTime;
- public SleepingAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) {
+ public SleepingAction(Func2 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) {
this.underlying = underlying;
this.scheduler = scheduler;
this.execTime = scheduler.now() + timeUnit.toMillis(timespan);
}
@Override
- public Subscription call() {
+ public Subscription call(Scheduler s, T state) {
if (execTime < scheduler.now()) {
try {
Thread.sleep(scheduler.now() - execTime);
@@ -43,7 +43,6 @@ public Subscription call() {
}
}
- return underlying.call();
-
+ return underlying.call(s, state);
}
}
diff --git a/rxjava-core/src/main/java/rx/operators/Tester.java b/rxjava-core/src/main/java/rx/operators/Tester.java
index bc04242846..7a1c60e7dc 100644
--- a/rxjava-core/src/main/java/rx/operators/Tester.java
+++ b/rxjava-core/src/main/java/rx/operators/Tester.java
@@ -19,6 +19,7 @@
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
+import rx.util.functions.Func2;
/**
* Common utility functions for testing operator implementations.
@@ -272,7 +273,7 @@ public void onNext(String args)
}
}
- public static class ForwardingScheduler implements Scheduler {
+ public static class ForwardingScheduler extends Scheduler {
private final Scheduler underlying;
public ForwardingScheduler(Scheduler underlying) {
@@ -289,6 +290,16 @@ public Subscription schedule(Func0 action) {
return underlying.schedule(action);
}
+ @Override
+ public Subscription schedule(Func1 action) {
+ return underlying.schedule(action);
+ }
+
+ @Override
+ public Subscription schedule(T state, Func2 action) {
+ return underlying.schedule(state, action);
+ }
+
@Override
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
return underlying.schedule(action, dueTime, unit);
@@ -299,6 +310,16 @@ public Subscription schedule(Func0 action, long dueTime, TimeUnit
return underlying.schedule(action, dueTime, unit);
}
+ @Override
+ public Subscription schedule(Func1 action, long dueTime, TimeUnit unit) {
+ return underlying.schedule(action, dueTime, unit);
+ }
+
+ @Override
+ public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) {
+ return underlying.schedule(state, action, dueTime, unit);
+ }
+
@Override
public long now() {
return underlying.now();
diff --git a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java
index 0ad92e1d06..05f804000d 100644
--- a/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java
+++ b/rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java
@@ -1,3 +1,18 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rx.subscriptions;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
new file mode 100644
index 0000000000..096500217c
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subscriptions;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import rx.Subscription;
+import rx.util.functions.Functions;
+
+/**
+ * Subscription that represents a group of Subscriptions that are unsubscribed together.
+ *
+ * @see Rx.Net equivalent CompositeDisposable at http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx
+ */
+public class CompositeSubscription implements Subscription {
+
+ private static final Logger logger = LoggerFactory.getLogger(Functions.class);
+
+ /*
+ * The reason 'synchronized' is used on 'add' and 'unsubscribe' is because AtomicBoolean/ConcurrentLinkedQueue are both being modified so it needs to be done atomically.
+ *
+ * TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach
+ */
+ private AtomicBoolean unsubscribed = new AtomicBoolean(false);
+ private final ConcurrentLinkedQueue subscriptions = new ConcurrentLinkedQueue();
+
+ public CompositeSubscription(List subscriptions) {
+ this.subscriptions.addAll(subscriptions);
+ }
+
+ public CompositeSubscription(Subscription... subscriptions) {
+ for (Subscription s : subscriptions) {
+ this.subscriptions.add(s);
+ }
+ }
+
+ public boolean isUnsubscribed() {
+ return unsubscribed.get();
+ }
+
+ public synchronized void add(Subscription s) {
+ if (unsubscribed.get()) {
+ s.unsubscribe();
+ } else {
+ subscriptions.add(s);
+ }
+ }
+
+ @Override
+ public synchronized void unsubscribe() {
+ if (unsubscribed.compareAndSet(false, true)) {
+ for (Subscription s : subscriptions) {
+ try {
+ s.unsubscribe();
+ } catch (Exception e) {
+ logger.error("Failed to unsubscribe.", e);
+ }
+ }
+ }
+ }
+
+}
diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java
index 4c79ce7bfa..1db0b7a660 100644
--- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java
+++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java
@@ -1,5 +1,22 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package rx.subscriptions;
+import java.util.concurrent.Future;
+
import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.FuncN;
@@ -31,6 +48,37 @@ public void unsubscribe() {
};
}
+ /**
+ * A {@link Subscription} that wraps a {@link Future} and cancels it when unsubscribed.
+ *
+ *
+ * @param f
+ * {@link Future}
+ * @return {@link Subscription}
+ */
+ public static Subscription create(final Future> f) {
+ return new Subscription() {
+
+ @Override
+ public void unsubscribe() {
+ f.cancel(true);
+ }
+
+ };
+ }
+
+ /**
+ * A {@link Subscription} that groups multiple Subscriptions together and unsubscribes from all of them together.
+ *
+ * @param subscriptions
+ * Subscriptions to group together
+ * @return {@link Subscription}
+ */
+
+ public static CompositeSubscription create(Subscription... subscriptions) {
+ return new CompositeSubscription(subscriptions);
+ }
+
/**
* A {@link Subscription} implemented via an anonymous function (such as closures from other languages).
*
diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
index ec247d0b95..59edfade39 100644
--- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
+++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
@@ -16,20 +16,25 @@
package rx.concurrency;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
+import rx.Scheduler;
import rx.Subscription;
+import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
+import rx.util.functions.Func2;
public class TestSchedulers {
@@ -245,4 +250,149 @@ public void call(Integer t) {
assertEquals(5, count.get());
}
+ @Test
+ public void testRecursiveScheduler1() {
+ Observable obs = Observable.create(new Func1, Subscription>() {
+ @Override
+ public Subscription call(final Observer observer) {
+ return Schedulers.currentThread().schedule(0, new Func2() {
+ @Override
+ public Subscription call(Scheduler scheduler, Integer i) {
+ if (i > 42) {
+ observer.onCompleted();
+ return Subscriptions.empty();
+ }
+
+ observer.onNext(i);
+
+ return scheduler.schedule(i + 1, this);
+ }
+ });
+ }
+ });
+
+ final AtomicInteger lastValue = new AtomicInteger();
+ obs.forEach(new Action1() {
+
+ @Override
+ public void call(Integer v) {
+ System.out.println("Value: " + v);
+ lastValue.set(v);
+ }
+ });
+
+ assertEquals(42, lastValue.get());
+ }
+
+ @Test
+ public void testRecursiveScheduler2() throws InterruptedException {
+ // use latches instead of Thread.sleep
+ final CountDownLatch latch = new CountDownLatch(10);
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
+ Observable obs = Observable.create(new Func1, Subscription>() {
+ @Override
+ public Subscription call(final Observer observer) {
+
+ return Schedulers.threadPoolForComputation().schedule(new BooleanSubscription(), new Func2() {
+ @Override
+ public Subscription call(Scheduler scheduler, BooleanSubscription cancel) {
+ if (cancel.isUnsubscribed()) {
+ observer.onCompleted();
+ completionLatch.countDown();
+ return Subscriptions.empty();
+ }
+
+ observer.onNext(42);
+ latch.countDown();
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ scheduler.schedule(cancel, this);
+
+ return cancel;
+ }
+ });
+ }
+ });
+
+ @SuppressWarnings("unchecked")
+ Observer o = mock(Observer.class);
+
+ final AtomicInteger count = new AtomicInteger();
+ final AtomicBoolean completed = new AtomicBoolean(false);
+ Subscription subscribe = obs.subscribe(new Observer() {
+ @Override
+ public void onCompleted() {
+ System.out.println("Completed");
+ completed.set(true);
+ }
+
+ @Override
+ public void onError(Exception e) {
+ System.out.println("Error");
+ }
+
+ @Override
+ public void onNext(Integer args) {
+ count.incrementAndGet();
+ System.out.println(args);
+ }
+ });
+
+ if (!latch.await(5000, TimeUnit.MILLISECONDS)) {
+ fail("Timed out waiting on onNext latch");
+ }
+
+ // now unsubscribe and ensure it stops the recursive loop
+ subscribe.unsubscribe();
+ System.out.println("unsubscribe");
+
+ if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) {
+ fail("Timed out waiting on completion latch");
+ }
+
+ assertEquals(10, count.get()); // wondering if this could be 11 in a race condition (which would be okay due to how unsubscribe works ... just it would make this test non-deterministic)
+ assertTrue(completed.get());
+ }
+
+ @Test
+ public void testSchedulingWithDueTime() throws InterruptedException {
+
+ final CountDownLatch latch = new CountDownLatch(5);
+ final AtomicInteger counter = new AtomicInteger();
+
+ long start = System.currentTimeMillis();
+
+ Schedulers.threadPoolForComputation().schedule(null, new Func2() {
+
+ @Override
+ public Subscription call(Scheduler scheduler, String state) {
+ System.out.println("doing work");
+ latch.countDown();
+ counter.incrementAndGet();
+ if (latch.getCount() == 0) {
+ return Subscriptions.empty();
+ } else {
+ return scheduler.schedule(state, this, new Date(System.currentTimeMillis() + 50));
+ }
+ }
+ }, new Date(System.currentTimeMillis() + 100));
+
+ if (!latch.await(3000, TimeUnit.MILLISECONDS)) {
+ fail("didn't execute ... timed out");
+ }
+
+ long end = System.currentTimeMillis();
+
+ assertEquals(5, counter.get());
+ if ((end - start) < 250) {
+ fail("it should have taken over 250ms since each step was scheduled 50ms in the future");
+ }
+ }
+
}