Skip to content

Commit d91794b

Browse files
Scheduler Outer/Inner
1 parent 26f36ec commit d91794b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+889
-1494
lines changed

rxjava-core/src/main/java/rx/Observable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5300,7 +5300,7 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
53005300
}
53015301

53025302
/**
5303-
* Perform work on the source {@code Observable<T>} in parallel by sharding it on a {@link Schedulers#threadPoolForComputation()} {@link Scheduler}, and return the resulting {@code Observable<R>}.
5303+
* Perform work on the source {@code Observable<T>} in parallel by sharding it on a {@link Schedulers#computation()} {@link Scheduler}, and return the resulting {@code Observable<R>}.
53045304
* <p>
53055305
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/parallel.png">
53065306
*
+43-160
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2014 Netflix, Inc.
2+
* Copyright 2013 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,16 +15,9 @@
1515
*/
1616
package rx;
1717

18-
import java.util.Date;
1918
import java.util.concurrent.TimeUnit;
20-
import java.util.concurrent.atomic.AtomicBoolean;
2119

22-
import rx.subscriptions.CompositeSubscription;
23-
import rx.subscriptions.MultipleAssignmentSubscription;
24-
import rx.subscriptions.Subscriptions;
25-
import rx.util.functions.Action0;
2620
import rx.util.functions.Action1;
27-
import rx.util.functions.Func2;
2821

2922
/**
3023
* Represents an object that schedules units of work.
@@ -48,30 +41,24 @@
4841
public abstract class Scheduler {
4942

5043
/**
51-
* Schedules a cancelable action to be executed.
44+
* Schedules an Action on a new Scheduler instance (typically another thread) for execution.
5245
*
53-
* @param state
54-
* State to pass into the action.
5546
* @param action
5647
* Action to schedule.
5748
* @return a subscription to be able to unsubscribe from action.
5849
*/
59-
public abstract <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action);
50+
51+
public abstract Subscription schedule(Action1<Scheduler.Inner> action);
6052

6153
/**
62-
* Schedules a cancelable action to be executed in delayTime.
54+
* Schedules an Action on a new Scheduler instance (typically another thread) for execution at some point in the future.
6355
*
64-
* @param state
65-
* State to pass into the action.
6656
* @param action
67-
* Action to schedule.
6857
* @param delayTime
69-
* Time the action is to be delayed before executing.
7058
* @param unit
71-
* Time unit of the delay time.
72-
* @return a subscription to be able to unsubscribe from action.
59+
* @return
7360
*/
74-
public abstract <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit);
61+
public abstract Subscription schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit);
7562

7663
/**
7764
* Schedules a cancelable action to be executed periodically.
@@ -90,152 +77,58 @@ public abstract class Scheduler {
9077
* The time unit the interval above is given in.
9178
* @return A subscription to be able to unsubscribe from action.
9279
*/
93-
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
80+
public Subscription schedulePeriodically(final Action1<Scheduler.Inner> action, long initialDelay, long period, TimeUnit unit) {
9481
final long periodInNanos = unit.toNanos(period);
95-
final AtomicBoolean complete = new AtomicBoolean();
9682

97-
final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
83+
final Action1<Scheduler.Inner> recursiveAction = new Action1<Scheduler.Inner>() {
9884
@Override
99-
public Subscription call(Scheduler scheduler, T state0) {
100-
if (!complete.get()) {
85+
public void call(Inner inner) {
86+
if (!inner.isUnsubscribed()) {
10187
long startedAt = now();
102-
final Subscription sub1 = action.call(scheduler, state0);
88+
action.call(inner);
10389
long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
104-
final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
105-
return Subscriptions.create(new Action0() {
106-
@Override
107-
public void call() {
108-
sub1.unsubscribe();
109-
sub2.unsubscribe();
110-
}
111-
});
90+
inner.schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
11291
}
113-
return Subscriptions.empty();
11492
}
11593
};
116-
final Subscription sub = schedule(state, recursiveAction, initialDelay, unit);
117-
return Subscriptions.create(new Action0() {
118-
@Override
119-
public void call() {
120-
complete.set(true);
121-
sub.unsubscribe();
122-
}
123-
});
94+
return schedule(recursiveAction, initialDelay, unit);
12495
}
12596

126-
/**
127-
* Schedules a cancelable action to be executed at dueTime.
128-
*
129-
* @param state
130-
* State to pass into the action.
131-
* @param action
132-
* Action to schedule.
133-
* @param dueTime
134-
* Time the action is to be executed. If in the past it will be executed immediately.
135-
* @return a subscription to be able to unsubscribe from action.
136-
*/
137-
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, Date dueTime) {
138-
long scheduledTime = dueTime.getTime();
139-
long timeInFuture = scheduledTime - now();
140-
if (timeInFuture <= 0) {
141-
return schedule(state, action);
142-
} else {
143-
return schedule(state, action, timeInFuture, TimeUnit.MILLISECONDS);
97+
public abstract static class Inner implements Subscription {
98+
99+
/**
100+
* Schedules an action to be executed in delayTime.
101+
*
102+
* @param delayTime
103+
* Time the action is to be delayed before executing.
104+
* @param unit
105+
* Time unit of the delay time.
106+
*/
107+
public abstract void schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit);
108+
109+
/**
110+
* Schedules a cancelable action to be executed in delayTime.
111+
*
112+
*/
113+
public abstract void schedule(Action1<Scheduler.Inner> action);
114+
115+
/**
116+
* @return the scheduler's notion of current absolute time in milliseconds.
117+
*/
118+
public long now() {
119+
return System.currentTimeMillis();
144120
}
145121
}
146122

147123
/**
148-
* Schedules an action and receives back an action for recursive execution.
149-
*
150-
* @param action
151-
* action
152-
* @return a subscription to be able to unsubscribe from action.
153-
*/
154-
public Subscription schedule(final Action1<Action0> action) {
155-
final CompositeSubscription parentSubscription = new CompositeSubscription();
156-
final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
157-
parentSubscription.add(childSubscription);
158-
159-
final Func2<Scheduler, Func2, Subscription> parentAction = new Func2<Scheduler, Func2, Subscription>() {
160-
161-
@Override
162-
public Subscription call(final Scheduler scheduler, final Func2 parentAction) {
163-
action.call(new Action0() {
164-
165-
@Override
166-
public void call() {
167-
if (!parentSubscription.isUnsubscribed()) {
168-
childSubscription.set(scheduler.schedule(parentAction, parentAction));
169-
}
170-
}
171-
172-
});
173-
return childSubscription;
174-
}
175-
};
176-
177-
parentSubscription.add(schedule(parentAction, parentAction));
178-
179-
return parentSubscription;
180-
}
181-
182-
/**
183-
* Schedules an action to be executed.
184-
*
185-
* @param action
186-
* action
187-
* @return a subscription to be able to unsubscribe from action.
188-
*/
189-
public Subscription schedule(final Action0 action) {
190-
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
191-
192-
@Override
193-
public Subscription call(Scheduler scheduler, Void state) {
194-
action.call();
195-
return Subscriptions.empty();
196-
}
197-
});
198-
}
199-
200-
/**
201-
* Schedules an action to be executed in delayTime.
202-
*
203-
* @param action
204-
* action
205-
* @return a subscription to be able to unsubscribe from action.
206-
*/
207-
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
208-
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
209-
210-
@Override
211-
public Subscription call(Scheduler scheduler, Void state) {
212-
action.call();
213-
return Subscriptions.empty();
214-
}
215-
}, delayTime, unit);
216-
}
217-
218-
/**
219-
* Schedules an action to be executed periodically.
124+
* Parallelism available to a Scheduler.
125+
* <p>
126+
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
220127
*
221-
* @param action
222-
* The action to execute periodically.
223-
* @param initialDelay
224-
* Time to wait before executing the action for the first time.
225-
* @param period
226-
* The time interval to wait each time in between executing the action.
227-
* @param unit
228-
* The time unit the interval above is given in.
229-
* @return A subscription to be able to unsubscribe from action.
128+
* @return the scheduler's available degree of parallelism.
230129
*/
231-
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
232-
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
233-
@Override
234-
public Subscription call(Scheduler scheduler, Void state) {
235-
action.call();
236-
return Subscriptions.empty();
237-
}
238-
}, initialDelay, period, unit);
130+
public int degreeOfParallelism() {
131+
return Runtime.getRuntime().availableProcessors();
239132
}
240133

241134
/**
@@ -245,14 +138,4 @@ public long now() {
245138
return System.currentTimeMillis();
246139
}
247140

248-
/**
249-
* Parallelism available to a Scheduler.
250-
* <p>
251-
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
252-
*
253-
* @return the scheduler's available degree of parallelism.
254-
*/
255-
public int degreeOfParallelism() {
256-
return Runtime.getRuntime().availableProcessors();
257-
}
258141
}

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

-54
This file was deleted.

rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java

-39
This file was deleted.

0 commit comments

Comments
 (0)