diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e9015b0379..bbbbd10d02 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -36,32 +36,7 @@ import org.mockito.MockitoAnnotations; import rx.observables.GroupedObservable; -import rx.operators.OperationConcat; -import rx.operators.OperationDefer; -import rx.operators.OperationDematerialize; -import rx.operators.OperationFilter; -import rx.operators.OperationMap; -import rx.operators.OperationMaterialize; -import rx.operators.OperationMerge; -import rx.operators.OperationMergeDelayError; -import rx.operators.OperationMostRecent; -import rx.operators.OperationNext; -import rx.operators.OperationOnErrorResumeNextViaFunction; -import rx.operators.OperationOnErrorResumeNextViaObservable; -import rx.operators.OperationOnErrorReturn; -import rx.operators.OperationScan; -import rx.operators.OperationSkip; -import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; -import rx.operators.OperationTakeLast; -import rx.operators.OperationToObservableFuture; -import rx.operators.OperationToObservableIterable; -import rx.operators.OperationToObservableList; -import rx.operators.OperationToObservableSortedList; -import rx.operators.OperationZip; -import rx.operators.OperatorGroupBy; -import rx.operators.OperatorTakeUntil; -import rx.operators.OperatorToIterator; +import rx.operators.*; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; @@ -766,6 +741,30 @@ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } + /** + * Asynchronously subscribes and unsubscribes observers on the specified scheduler. + * + * @param source the source observable. + * @param scheduler the scheduler to perform subscription and unsubscription actions on. + * @param the type of observable. + * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ + public static Observable subscribeOn(Observable source, Scheduler scheduler) { + return _create(OperationSubscribeOn.subscribeOn(source, scheduler)); + } + + /** + * Asynchronously notify observers on the specified scheduler. + * + * @param source the source observable. + * @param scheduler the scheduler to notify observers on. + * @param the type of observable. + * @return the source sequence whose observations happen on the specified scheduler. + */ + public static Observable observeOn(Observable source, Scheduler scheduler) { + return _create(OperationObserveOn.observeOn(source, scheduler)); + } + /** * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer @@ -2589,6 +2588,26 @@ public Observable> materialize() { return materialize(this); } + /** + * Asynchronously subscribes and unsubscribes observers on the specified scheduler. + * + * @param scheduler the scheduler to perform subscription and unsubscription actions on. + * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ + public Observable subscribeOn(Scheduler scheduler) { + return subscribeOn(this, scheduler); + } + + /** + * Asynchronously notify observers on the specified scheduler. + * + * @param scheduler the scheduler to notify observers on. + * @return the source sequence whose observations happen on the specified scheduler. + */ + public Observable observeOn(Scheduler scheduler) { + return observeOn(this, scheduler); + } + /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java new file mode 100644 index 0000000000..b4d2c5d471 --- /dev/null +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -0,0 +1,65 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx; + +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +/** + * Represents an object that schedules units of work. + */ +public interface Scheduler { + + /** + * Schedules a cancelable action to be executed. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Func0 action); + + /** + * Schedules an action to be executed. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Action0 action); + + /** + * Schedules an action to be executed in dueTime. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Action0 action, long dueTime, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed in dueTime. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Func0 action, long dueTime, TimeUnit unit); + + /** + * Returns the scheduler's notion of current time. + */ + long now(); + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java new file mode 100644 index 0000000000..a056f72152 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -0,0 +1,53 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public abstract class AbstractScheduler implements Scheduler { + + @Override + public Subscription schedule(Action0 action) { + return schedule(asFunc0(action)); + } + + @Override + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return schedule(asFunc0(action), dueTime, unit); + } + + @Override + public long now() { + return System.nanoTime(); + } + + private static Func0 asFunc0(final Action0 action) { + return new Func0() { + @Override + public Subscription call() { + action.call(); + return Subscriptions.empty(); + } + }; + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java new file mode 100644 index 0000000000..e1f253d181 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -0,0 +1,142 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import org.junit.Test; +import org.mockito.InOrder; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.*; + +public class CurrentThreadScheduler extends AbstractScheduler { + private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + + public static CurrentThreadScheduler getInstance() { + return INSTANCE; + } + + private static final ThreadLocal> QUEUE = new ThreadLocal>(); + + private CurrentThreadScheduler() { + } + + @Override + public Subscription schedule(Func0 action) { + DiscardableAction discardableAction = new DiscardableAction(action); + enqueue(discardableAction); + return discardableAction; + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + + private void enqueue(DiscardableAction action) { + Queue queue = QUEUE.get(); + boolean exec = queue == null; + + if (exec) { + queue = new LinkedList(); + QUEUE.set(queue); + } + + queue.add(action); + + if (exec) { + while (!queue.isEmpty()) { + queue.poll().call(); + } + + QUEUE.set(null); + } + } + + public static class UnitTest { + + @Test + public void testNestedActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + firstStepStart.call(); + firstStepEnd.call(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + scheduler.schedule(thirdAction); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } + + @Test + public void testSequenceOfActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + + scheduler.schedule(first); + scheduler.schedule(second); + + verify(first, times(1)).call(); + verify(second, times(1)).call(); + + } + + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java new file mode 100644 index 0000000000..632ec69b1a --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -0,0 +1,50 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func0; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class DiscardableAction implements Func0, Subscription { + private final Func0 underlying; + + private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription(); + private final AtomicBoolean ready = new AtomicBoolean(true); + + public DiscardableAction(Func0 underlying) { + this.underlying = underlying; + } + + @Override + public Subscription call() { + if (ready.compareAndSet(true, false)) { + Subscription subscription = underlying.call(); + wrapper.wrap(subscription); + return subscription; + } + return wrapper; + } + + @Override + public void unsubscribe() { + ready.set(false); + wrapper.unsubscribe(); + } +} + diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java new file mode 100644 index 0000000000..e31535c91a --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -0,0 +1,50 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +public class ExecutorScheduler extends AbstractScheduler { + private final Executor executor; + + public ExecutorScheduler(Executor executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(Func0 action) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + executor.execute(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }); + + return discardableAction; + + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + throw new IllegalStateException("Delayed scheduling is not supported"); + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java new file mode 100644 index 0000000000..a32f72aa9f --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java @@ -0,0 +1,56 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public class ForwardingScheduler implements Scheduler { + private final Scheduler underlying; + + public ForwardingScheduler(Scheduler underlying) { + this.underlying = underlying; + } + + @Override + public Subscription schedule(Action0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Func0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); + } + + @Override + public long now() { + return underlying.now(); + } +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java new file mode 100644 index 0000000000..9dcd0d4f65 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -0,0 +1,105 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import org.junit.Test; +import org.mockito.InOrder; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +public final class ImmediateScheduler extends AbstractScheduler { + private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); + + public static ImmediateScheduler getInstance() { + return INSTANCE; + } + + private ImmediateScheduler() { + } + + @Override + public Subscription schedule(Func0 action) { + return action.call(); + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + + public static class UnitTest { + + @Test + public void testNestedActions() { + final ImmediateScheduler scheduler = new ImmediateScheduler(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + firstStepStart.call(); + firstStepEnd.call(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + scheduler.schedule(thirdAction); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + } + + } + + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java new file mode 100644 index 0000000000..c2393fbe95 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -0,0 +1,52 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public class NewThreadScheduler extends AbstractScheduler { + private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); + + public static NewThreadScheduler getInstance() { + return INSTANCE; + } + + + @Override + public Subscription schedule(Func0 action) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }); + + t.start(); + + return discardableAction; + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java new file mode 100644 index 0000000000..0558d6fa26 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -0,0 +1,48 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ScheduledExecutorServiceScheduler extends AbstractScheduler { + private final ScheduledExecutorService executorService; + + public ScheduledExecutorServiceScheduler(ScheduledExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public Subscription schedule(Func0 action) { + return schedule(action, 0, TimeUnit.MILLISECONDS); + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + final DiscardableAction discardableAction = new DiscardableAction(action); + executorService.schedule(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }, dueTime, unit); + return discardableAction; + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java new file mode 100644 index 0000000000..d5ac25c13a --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -0,0 +1,92 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Scheduler; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class Schedulers { + private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); + private static final ScheduledExecutorService IO_EXECUTOR = createIOExecutor(); + private static final int DEFAULT_MAX_IO_THREADS = 10; + private static final int DEFAULT_KEEP_ALIVE_TIME = 10 * 1000; // 10 seconds + + private Schedulers() { + + } + + public static Scheduler immediate() { + return ImmediateScheduler.getInstance(); + } + + public static Scheduler currentThread() { + return CurrentThreadScheduler.getInstance(); + } + + public static Scheduler newThread() { + return NewThreadScheduler.getInstance(); + } + + public static Scheduler executor(Executor executor) { + return new ExecutorScheduler(executor); + } + + public static Scheduler fromScheduledExecutorService(ScheduledExecutorService executor) { + return new ScheduledExecutorServiceScheduler(executor); + } + + public static Scheduler threadPoolForComputation() { + return fromScheduledExecutorService(COMPUTATION_EXECUTOR); + } + + public static Scheduler threadPoolForIO() { + return fromScheduledExecutorService(IO_EXECUTOR); + } + + public static Scheduler forwardingScheduler(Scheduler underlying) { + return new ForwardingScheduler(underlying); + } + + private static ScheduledExecutorService createComputationExecutor() { + int cores = Runtime.getRuntime().availableProcessors(); + return Executors.newScheduledThreadPool(cores, new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); + } + }); + } + + private static ScheduledExecutorService createIOExecutor() { + ScheduledThreadPoolExecutor result = new ScheduledThreadPoolExecutor(DEFAULT_MAX_IO_THREADS, new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RxIOThreadPool-" + counter.incrementAndGet()); + } + }); + + result.setKeepAliveTime(DEFAULT_KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS); + result.allowCoreThreadTimeOut(true); + + return result; + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java new file mode 100644 index 0000000000..02823436d4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -0,0 +1,50 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public class SleepingAction implements Func0 { + private final Func0 underlying; + private final Scheduler scheduler; + private final long execTime; + + public SleepingAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) { + this.underlying = underlying; + this.scheduler = scheduler; + this.execTime = scheduler.now() + timeUnit.toMillis(timespan); + } + + @Override + public Subscription call() { + if (execTime < scheduler.now()) { + try { + Thread.sleep(scheduler.now() - execTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + return underlying.call(); + + } +} diff --git a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java new file mode 100644 index 0000000000..ec38b90d0e --- /dev/null +++ b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java @@ -0,0 +1,45 @@ +package rx.observables; + +import rx.Observer; +import rx.Scheduler; +import rx.util.functions.Action0; + +public class ScheduledObserver implements Observer { + private final Observer underlying; + private final Scheduler scheduler; + + public ScheduledObserver(Observer underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onError(final Exception e) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onError(e); + } + }); + } + + @Override + public void onNext(final T args) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onNext(args); + } + }); + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java new file mode 100644 index 0000000000..16eed89270 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -0,0 +1,76 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.observables.ScheduledObserver; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class OperationObserveOn { + + public static Func1, Subscription> observeOn(Observable source, Scheduler scheduler) { + return new ObserveOn(source, scheduler); + } + + private static class ObserveOn implements Func1, Subscription> { + private final Observable source; + private final Scheduler scheduler; + + public ObserveOn(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return source.subscribe(new ScheduledObserver(observer, scheduler)); + } + } + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testObserveOn() { + + Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate())); + + Observer observer = mock(Observer.class); + Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer); + + verify(scheduler, times(4)).schedule(any(Action0.class)); + verifyNoMoreInteractions(scheduler); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java new file mode 100644 index 0000000000..104a134657 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -0,0 +1,102 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.util.functions.Action0; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public class OperationSubscribeOn { + + public static Func1, Subscription> subscribeOn(Observable source, Scheduler scheduler) { + return new SubscribeOn(source, scheduler); + } + + private static class SubscribeOn implements Func1, Subscription> { + private final Observable source; + private final Scheduler scheduler; + + public SubscribeOn(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return scheduler.schedule(new Func0() { + @Override + public Subscription call() { + return new ScheduledSubscription(source.subscribe(observer), scheduler); + } + }); + } + } + + private static class ScheduledSubscription implements Subscription { + private final Subscription underlying; + private final Scheduler scheduler; + + private ScheduledSubscription(Subscription underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void unsubscribe() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.unsubscribe(); + } + }); + } + } + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testSubscribeOn() { + Observable w = Observable.toObservable(1, 2, 3); + + Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate())); + + Observer observer = mock(Observer.class); + Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer); + + verify(scheduler, times(1)).schedule(any(Func0.class)); + subscription.unsubscribe(); + verify(scheduler, times(1)).schedule(any(Action0.class)); + verifyNoMoreInteractions(scheduler); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + } + +} \ No newline at end of file