Skip to content

Deprecate remaining public scheduler types. #3871

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import rx.Observable.Operator;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.schedulers.*;
import rx.internal.util.*;
import rx.internal.util.atomic.SpscAtomicArrayQueue;
import rx.internal.util.unsafe.*;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.*;

/**
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.schedulers;
package rx.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import rx.*;
import rx.functions.Action0;
import rx.internal.schedulers.*;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.*;

Expand All @@ -30,7 +29,7 @@
* Note that thread-hopping is unavoidable with this kind of Scheduler as we don't know about the underlying
* threading behavior of the executor.
*/
/* public */final class ExecutorScheduler extends Scheduler {
public final class ExecutorScheduler extends Scheduler {
final Executor executor;
public ExecutorScheduler(Executor executor) {
this.executor = executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import rx.Scheduler;
import rx.internal.util.RxThreadFactory;
import rx.schedulers.*;

/**
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
Expand Down
73 changes: 73 additions & 0 deletions src/main/java/rx/internal/schedulers/ImmediateScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.schedulers;

import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;

/**
* Executes work immediately on the current thread.
*/
public final class ImmediateScheduler extends Scheduler {
public static final ImmediateScheduler INSTANCE = new ImmediateScheduler();

private ImmediateScheduler() {
}

@Override
public Worker createWorker() {
return new InnerImmediateScheduler();
}

private class InnerImmediateScheduler extends Scheduler.Worker implements Subscription {

final BooleanSubscription innerSubscription = new BooleanSubscription();

InnerImmediateScheduler() {
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// since we are executing immediately on this thread we must cause this thread to sleep
long execTime = ImmediateScheduler.this.now() + unit.toMillis(delayTime);

return schedule(new SleepingAction(action, this, execTime));
}

@Override
public Subscription schedule(Action0 action) {
action.call();
return Subscriptions.unsubscribed();
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.schedulers;
package rx.internal.schedulers;

import rx.Scheduler;
import rx.functions.Action0;
Expand Down
131 changes: 131 additions & 0 deletions src/main/java/rx/internal/schedulers/TrampolineScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.schedulers;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;

/**
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
* after the current unit of work is completed.
*/
public final class TrampolineScheduler extends Scheduler {
public static final TrampolineScheduler INSTANCE = new TrampolineScheduler();

@Override
public Worker createWorker() {
return new InnerCurrentThreadScheduler();
}

private TrampolineScheduler() {
}

private static class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription {

final AtomicInteger counter = new AtomicInteger();
final PriorityBlockingQueue<TimedAction> queue = new PriorityBlockingQueue<TimedAction>();
private final BooleanSubscription innerSubscription = new BooleanSubscription();
private final AtomicInteger wip = new AtomicInteger();

InnerCurrentThreadScheduler() {
}

@Override
public Subscription schedule(Action0 action) {
return enqueue(action, now());
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
long execTime = now() + unit.toMillis(delayTime);

return enqueue(new SleepingAction(action, this, execTime), execTime);
}

private Subscription enqueue(Action0 action, long execTime) {
if (innerSubscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
final TimedAction timedAction = new TimedAction(action, execTime, counter.incrementAndGet());
queue.add(timedAction);

if (wip.getAndIncrement() == 0) {
do {
final TimedAction polled = queue.poll();
if (polled != null) {
polled.action.call();
}
} while (wip.decrementAndGet() > 0);
return Subscriptions.unsubscribed();
} else {
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
return Subscriptions.create(new Action0() {

@Override
public void call() {
queue.remove(timedAction);
}

});
}
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

}

private static final class TimedAction implements Comparable<TimedAction> {
final Action0 action;
final Long execTime;
final int count; // In case if time between enqueueing took less than 1ms

TimedAction(Action0 action, Long execTime, int count) {
this.action = action;
this.execTime = execTime;
this.count = count;
}

@Override
public int compareTo(TimedAction that) {
int result = execTime.compareTo(that.execTime);
if (result == 0) {
return compare(count, that.count);
}
return result;
}
}

// because I can't use Integer.compare from Java 7
static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}

}
55 changes: 6 additions & 49 deletions src/main/java/rx/schedulers/ImmediateScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,20 @@
*/
package rx.schedulers;

import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;

/**
* Executes work immediately on the current thread.
* @deprecated This type was never publicly instantiable. Use {@link Schedulers#immediate()}.
*/
@Deprecated
@SuppressWarnings("unused") // Class was part of public API.
public final class ImmediateScheduler extends Scheduler {
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();

/* package */static ImmediateScheduler instance() {
return INSTANCE;
}

/* package accessible for unit tests */ImmediateScheduler() {
private ImmediateScheduler() {
throw new AssertionError();
}

@Override
public Worker createWorker() {
return new InnerImmediateScheduler();
return null;
}

private class InnerImmediateScheduler extends Scheduler.Worker implements Subscription {

final BooleanSubscription innerSubscription = new BooleanSubscription();

InnerImmediateScheduler() {
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// since we are executing immediately on this thread we must cause this thread to sleep
long execTime = ImmediateScheduler.this.now() + unit.toMillis(delayTime);

return schedule(new SleepingAction(action, this, execTime));
}

@Override
public Subscription schedule(Action0 action) {
action.call();
return Subscriptions.unsubscribed();
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

}

}
1 change: 1 addition & 0 deletions src/main/java/rx/schedulers/NewThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* @deprecated This type was never publicly instantiable. Use {@link Schedulers#newThread()}.
*/
@Deprecated
@SuppressWarnings("unused") // Class was part of public API.
public final class NewThreadScheduler extends Scheduler {
private NewThreadScheduler() {
throw new AssertionError();
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,29 @@ private Schedulers() {

/**
* Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
*
* @return an {@link ImmediateScheduler} instance
*
* @return a {@link Scheduler} that executes work immediately
*/
public static Scheduler immediate() {
return ImmediateScheduler.instance();
return rx.internal.schedulers.ImmediateScheduler.INSTANCE;
}

/**
* Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
* current work completes.
*
* @return a {@link TrampolineScheduler} instance
*
* @return a {@link Scheduler} that queues work on the current thread
*/
public static Scheduler trampoline() {
return TrampolineScheduler.instance();
return rx.internal.schedulers.TrampolineScheduler.INSTANCE;
}

/**
* Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
* <p>
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
*
* @return a {@link NewThreadScheduler} instance
* @return a {@link Scheduler} that creates new threads
*/
public static Scheduler newThread() {
return INSTANCE.newThreadScheduler;
Expand Down
Loading