diff --git a/src/main/java/rx/Scheduler.java b/src/main/java/rx/Scheduler.java index 12922bc4a4..921528c875 100644 --- a/src/main/java/rx/Scheduler.java +++ b/src/main/java/rx/Scheduler.java @@ -43,6 +43,17 @@ public abstract class Scheduler { * maintenance. */ + /** + * The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase. + *

+ * The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes. + */ + static final long CLOCK_DRIFT_TOLERANCE_NANOS; + static { + CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos( + Long.getLong("rx.scheduler.drift-tolerance", 15)); + } + /** * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions. *

@@ -109,17 +120,38 @@ public abstract static class Worker implements Subscription { */ public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { final long periodInNanos = unit.toNanos(period); - final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay); + final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now()); + final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay); final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); final Action0 recursiveAction = new Action0() { - long count = 0; + long count; + long lastNowNanos = firstNowNanos; + long startInNanos = firstStartInNanos; @Override public void call() { if (!mas.isUnsubscribed()) { action.call(); - long nextTick = startInNanos + (++count * periodInNanos); - mas.set(schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS)); + + long nextTick; + + long nowNanos = TimeUnit.MILLISECONDS.toNanos(now()); + // If the clock moved in a direction quite a bit, rebase the repetition period + if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos + || nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) { + nextTick = nowNanos + periodInNanos; + /* + * Shift the start point back by the drift as if the whole thing + * started count periods ago. + */ + startInNanos = nextTick - (periodInNanos * (++count)); + } else { + nextTick = startInNanos + (++count * periodInNanos); + } + lastNowNanos = nowNanos; + + long delay = nextTick - nowNanos; + mas.set(schedule(this, delay, TimeUnit.NANOSECONDS)); } } }; diff --git a/src/test/java/rx/SchedulerWorkerTest.java b/src/test/java/rx/SchedulerWorkerTest.java new file mode 100644 index 0000000000..8bb1094b46 --- /dev/null +++ b/src/test/java/rx/SchedulerWorkerTest.java @@ -0,0 +1,153 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx; + +import static org.junit.Assert.assertTrue; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import rx.functions.Action0; +import rx.schedulers.Schedulers; + +public class SchedulerWorkerTest { + + static final class CustomDriftScheduler extends Scheduler { + public volatile long drift; + @Override + public Worker createWorker() { + final Worker w = Schedulers.computation().createWorker(); + return new Worker() { + + @Override + public void unsubscribe() { + w.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return w.isUnsubscribed(); + } + + @Override + public Subscription schedule(Action0 action) { + return w.schedule(action); + } + + @Override + public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { + return w.schedule(action, delayTime, unit); + } + + @Override + public long now() { + return super.now() + drift; + } + }; + } + + @Override + public long now() { + return super.now() + drift; + } + } + + @Test + public void testCurrentTimeDriftBackwards() throws Exception { + CustomDriftScheduler s = new CustomDriftScheduler(); + + Scheduler.Worker w = s.createWorker(); + + try { + final List times = new ArrayList(); + + Subscription d = w.schedulePeriodically(new Action0() { + @Override + public void call() { + times.add(System.currentTimeMillis()); + } + }, 100, 100, TimeUnit.MILLISECONDS); + + Thread.sleep(150); + + s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS); + + Thread.sleep(400); + + d.unsubscribe(); + + Thread.sleep(150); + + System.out.println("Runs: " + times.size()); + + for (int i = 0; i < times.size() - 1 ; i++) { + long diff = times.get(i + 1) - times.get(i); + System.out.println("Diff #" + i + ": " + diff); + assertTrue("" + i + ":" + diff, diff < 150 && diff > 50); + } + + assertTrue("Too few invocations: " + times.size(), times.size() > 2); + + } finally { + w.unsubscribe(); + } + + } + + @Test + public void testCurrentTimeDriftForwards() throws Exception { + CustomDriftScheduler s = new CustomDriftScheduler(); + + Scheduler.Worker w = s.createWorker(); + + try { + final List times = new ArrayList(); + + Subscription d = w.schedulePeriodically(new Action0() { + @Override + public void call() { + times.add(System.currentTimeMillis()); + } + }, 100, 100, TimeUnit.MILLISECONDS); + + Thread.sleep(150); + + s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS); + + Thread.sleep(400); + + d.unsubscribe(); + + Thread.sleep(150); + + System.out.println("Runs: " + times.size()); + + assertTrue(times.size() > 2); + + for (int i = 0; i < times.size() - 1 ; i++) { + long diff = times.get(i + 1) - times.get(i); + System.out.println("Diff #" + i + ": " + diff); + assertTrue("Diff out of range: " + diff, diff < 250 && diff > 50); + } + + } finally { + w.unsubscribe(); + } + + } +}