Skip to content

Commit 95389c2

Browse files
JakeWhartonakarnokd
authored andcommitted
Deprecate remaining public scheduler types. (#3871)
1 parent 3439dd8 commit 95389c2

14 files changed

+316
-252
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import rx.Observable.Operator;
2323
import rx.exceptions.MissingBackpressureException;
2424
import rx.functions.Action0;
25+
import rx.internal.schedulers.*;
2526
import rx.internal.util.*;
2627
import rx.internal.util.atomic.SpscAtomicArrayQueue;
2728
import rx.internal.util.unsafe.*;
2829
import rx.plugins.RxJavaPlugins;
29-
import rx.schedulers.*;
3030

3131
/**
3232
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.

src/main/java/rx/schedulers/ExecutorScheduler.java renamed to src/main/java/rx/internal/schedulers/ExecutorScheduler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,13 @@
1313
* License for the specific language governing permissions and limitations under
1414
* the License.
1515
*/
16-
package rx.schedulers;
16+
package rx.internal.schedulers;
1717

1818
import java.util.concurrent.*;
1919
import java.util.concurrent.atomic.AtomicInteger;
2020

2121
import rx.*;
2222
import rx.functions.Action0;
23-
import rx.internal.schedulers.*;
2423
import rx.plugins.RxJavaPlugins;
2524
import rx.subscriptions.*;
2625

@@ -30,7 +29,7 @@
3029
* Note that thread-hopping is unavoidable with this kind of Scheduler as we don't know about the underlying
3130
* threading behavior of the executor.
3231
*/
33-
/* public */final class ExecutorScheduler extends Scheduler {
32+
public final class ExecutorScheduler extends Scheduler {
3433
final Executor executor;
3534
public ExecutorScheduler(Executor executor) {
3635
this.executor = executor;

src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import rx.Scheduler;
2222
import rx.internal.util.RxThreadFactory;
23-
import rx.schedulers.*;
2423

2524
/**
2625
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import rx.Scheduler;
21+
import rx.Subscription;
22+
import rx.functions.Action0;
23+
import rx.subscriptions.BooleanSubscription;
24+
import rx.subscriptions.Subscriptions;
25+
26+
/**
27+
* Executes work immediately on the current thread.
28+
*/
29+
public final class ImmediateScheduler extends Scheduler {
30+
public static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
31+
32+
private ImmediateScheduler() {
33+
}
34+
35+
@Override
36+
public Worker createWorker() {
37+
return new InnerImmediateScheduler();
38+
}
39+
40+
private class InnerImmediateScheduler extends Scheduler.Worker implements Subscription {
41+
42+
final BooleanSubscription innerSubscription = new BooleanSubscription();
43+
44+
InnerImmediateScheduler() {
45+
}
46+
47+
@Override
48+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
49+
// since we are executing immediately on this thread we must cause this thread to sleep
50+
long execTime = ImmediateScheduler.this.now() + unit.toMillis(delayTime);
51+
52+
return schedule(new SleepingAction(action, this, execTime));
53+
}
54+
55+
@Override
56+
public Subscription schedule(Action0 action) {
57+
action.call();
58+
return Subscriptions.unsubscribed();
59+
}
60+
61+
@Override
62+
public void unsubscribe() {
63+
innerSubscription.unsubscribe();
64+
}
65+
66+
@Override
67+
public boolean isUnsubscribed() {
68+
return innerSubscription.isUnsubscribed();
69+
}
70+
71+
}
72+
73+
}

src/main/java/rx/schedulers/SleepingAction.java renamed to src/main/java/rx/internal/schedulers/SleepingAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package rx.schedulers;
16+
package rx.internal.schedulers;
1717

1818
import rx.Scheduler;
1919
import rx.functions.Action0;
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
import java.util.concurrent.PriorityBlockingQueue;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import rx.Scheduler;
23+
import rx.Subscription;
24+
import rx.functions.Action0;
25+
import rx.subscriptions.BooleanSubscription;
26+
import rx.subscriptions.Subscriptions;
27+
28+
/**
29+
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
30+
* after the current unit of work is completed.
31+
*/
32+
public final class TrampolineScheduler extends Scheduler {
33+
public static final TrampolineScheduler INSTANCE = new TrampolineScheduler();
34+
35+
@Override
36+
public Worker createWorker() {
37+
return new InnerCurrentThreadScheduler();
38+
}
39+
40+
private TrampolineScheduler() {
41+
}
42+
43+
private static class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription {
44+
45+
final AtomicInteger counter = new AtomicInteger();
46+
final PriorityBlockingQueue<TimedAction> queue = new PriorityBlockingQueue<TimedAction>();
47+
private final BooleanSubscription innerSubscription = new BooleanSubscription();
48+
private final AtomicInteger wip = new AtomicInteger();
49+
50+
InnerCurrentThreadScheduler() {
51+
}
52+
53+
@Override
54+
public Subscription schedule(Action0 action) {
55+
return enqueue(action, now());
56+
}
57+
58+
@Override
59+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
60+
long execTime = now() + unit.toMillis(delayTime);
61+
62+
return enqueue(new SleepingAction(action, this, execTime), execTime);
63+
}
64+
65+
private Subscription enqueue(Action0 action, long execTime) {
66+
if (innerSubscription.isUnsubscribed()) {
67+
return Subscriptions.unsubscribed();
68+
}
69+
final TimedAction timedAction = new TimedAction(action, execTime, counter.incrementAndGet());
70+
queue.add(timedAction);
71+
72+
if (wip.getAndIncrement() == 0) {
73+
do {
74+
final TimedAction polled = queue.poll();
75+
if (polled != null) {
76+
polled.action.call();
77+
}
78+
} while (wip.decrementAndGet() > 0);
79+
return Subscriptions.unsubscribed();
80+
} else {
81+
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
82+
return Subscriptions.create(new Action0() {
83+
84+
@Override
85+
public void call() {
86+
queue.remove(timedAction);
87+
}
88+
89+
});
90+
}
91+
}
92+
93+
@Override
94+
public void unsubscribe() {
95+
innerSubscription.unsubscribe();
96+
}
97+
98+
@Override
99+
public boolean isUnsubscribed() {
100+
return innerSubscription.isUnsubscribed();
101+
}
102+
103+
}
104+
105+
private static final class TimedAction implements Comparable<TimedAction> {
106+
final Action0 action;
107+
final Long execTime;
108+
final int count; // In case if time between enqueueing took less than 1ms
109+
110+
TimedAction(Action0 action, Long execTime, int count) {
111+
this.action = action;
112+
this.execTime = execTime;
113+
this.count = count;
114+
}
115+
116+
@Override
117+
public int compareTo(TimedAction that) {
118+
int result = execTime.compareTo(that.execTime);
119+
if (result == 0) {
120+
return compare(count, that.count);
121+
}
122+
return result;
123+
}
124+
}
125+
126+
// because I can't use Integer.compare from Java 7
127+
static int compare(int x, int y) {
128+
return (x < y) ? -1 : ((x == y) ? 0 : 1);
129+
}
130+
131+
}

src/main/java/rx/schedulers/ImmediateScheduler.java

Lines changed: 6 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,63 +15,20 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.TimeUnit;
19-
2018
import rx.Scheduler;
21-
import rx.Subscription;
22-
import rx.functions.Action0;
23-
import rx.subscriptions.BooleanSubscription;
24-
import rx.subscriptions.Subscriptions;
2519

2620
/**
27-
* Executes work immediately on the current thread.
21+
* @deprecated This type was never publicly instantiable. Use {@link Schedulers#immediate()}.
2822
*/
23+
@Deprecated
24+
@SuppressWarnings("unused") // Class was part of public API.
2925
public final class ImmediateScheduler extends Scheduler {
30-
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
31-
32-
/* package */static ImmediateScheduler instance() {
33-
return INSTANCE;
34-
}
35-
36-
/* package accessible for unit tests */ImmediateScheduler() {
26+
private ImmediateScheduler() {
27+
throw new AssertionError();
3728
}
3829

3930
@Override
4031
public Worker createWorker() {
41-
return new InnerImmediateScheduler();
32+
return null;
4233
}
43-
44-
private class InnerImmediateScheduler extends Scheduler.Worker implements Subscription {
45-
46-
final BooleanSubscription innerSubscription = new BooleanSubscription();
47-
48-
InnerImmediateScheduler() {
49-
}
50-
51-
@Override
52-
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
53-
// since we are executing immediately on this thread we must cause this thread to sleep
54-
long execTime = ImmediateScheduler.this.now() + unit.toMillis(delayTime);
55-
56-
return schedule(new SleepingAction(action, this, execTime));
57-
}
58-
59-
@Override
60-
public Subscription schedule(Action0 action) {
61-
action.call();
62-
return Subscriptions.unsubscribed();
63-
}
64-
65-
@Override
66-
public void unsubscribe() {
67-
innerSubscription.unsubscribe();
68-
}
69-
70-
@Override
71-
public boolean isUnsubscribed() {
72-
return innerSubscription.isUnsubscribed();
73-
}
74-
75-
}
76-
7734
}

src/main/java/rx/schedulers/NewThreadScheduler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
* @deprecated This type was never publicly instantiable. Use {@link Schedulers#newThread()}.
2222
*/
2323
@Deprecated
24+
@SuppressWarnings("unused") // Class was part of public API.
2425
public final class NewThreadScheduler extends Scheduler {
2526
private NewThreadScheduler() {
2627
throw new AssertionError();

src/main/java/rx/schedulers/Schedulers.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,29 +61,29 @@ private Schedulers() {
6161

6262
/**
6363
* Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
64-
*
65-
* @return an {@link ImmediateScheduler} instance
64+
*
65+
* @return a {@link Scheduler} that executes work immediately
6666
*/
6767
public static Scheduler immediate() {
68-
return ImmediateScheduler.instance();
68+
return rx.internal.schedulers.ImmediateScheduler.INSTANCE;
6969
}
7070

7171
/**
7272
* Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
7373
* current work completes.
74-
*
75-
* @return a {@link TrampolineScheduler} instance
74+
*
75+
* @return a {@link Scheduler} that queues work on the current thread
7676
*/
7777
public static Scheduler trampoline() {
78-
return TrampolineScheduler.instance();
78+
return rx.internal.schedulers.TrampolineScheduler.INSTANCE;
7979
}
8080

8181
/**
8282
* Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
8383
* <p>
8484
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
8585
*
86-
* @return a {@link NewThreadScheduler} instance
86+
* @return a {@link Scheduler} that creates new threads
8787
*/
8888
public static Scheduler newThread() {
8989
return INSTANCE.newThreadScheduler;

0 commit comments

Comments
 (0)