Skip to content

Commit 8602550

Browse files
committed
Merge pull request #3467 from akarnokd/SchedulePeriodicallyClockShiftFix
1.x: compensation for significant clock drifts in schedulePeriodically
2 parents 3be980d + 3992b99 commit 8602550

File tree

2 files changed

+189
-4
lines changed

2 files changed

+189
-4
lines changed

src/main/java/rx/Scheduler.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,17 @@ public abstract class Scheduler {
4343
* maintenance.
4444
*/
4545

46+
/**
47+
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
48+
* <p>
49+
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
50+
*/
51+
static final long CLOCK_DRIFT_TOLERANCE_NANOS;
52+
static {
53+
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos(
54+
Long.getLong("rx.scheduler.drift-tolerance", 15));
55+
}
56+
4657
/**
4758
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
4859
* <p>
@@ -109,17 +120,38 @@ public abstract static class Worker implements Subscription {
109120
*/
110121
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
111122
final long periodInNanos = unit.toNanos(period);
112-
final long startInNanos = TimeUnit.MILLISECONDS.toNanos(now()) + unit.toNanos(initialDelay);
123+
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
124+
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
113125

114126
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
115127
final Action0 recursiveAction = new Action0() {
116-
long count = 0;
128+
long count;
129+
long lastNowNanos = firstNowNanos;
130+
long startInNanos = firstStartInNanos;
117131
@Override
118132
public void call() {
119133
if (!mas.isUnsubscribed()) {
120134
action.call();
121-
long nextTick = startInNanos + (++count * periodInNanos);
122-
mas.set(schedule(this, nextTick - TimeUnit.MILLISECONDS.toNanos(now()), TimeUnit.NANOSECONDS));
135+
136+
long nextTick;
137+
138+
long nowNanos = TimeUnit.MILLISECONDS.toNanos(now());
139+
// If the clock moved in a direction quite a bit, rebase the repetition period
140+
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
141+
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
142+
nextTick = nowNanos + periodInNanos;
143+
/*
144+
* Shift the start point back by the drift as if the whole thing
145+
* started count periods ago.
146+
*/
147+
startInNanos = nextTick - (periodInNanos * (++count));
148+
} else {
149+
nextTick = startInNanos + (++count * periodInNanos);
150+
}
151+
lastNowNanos = nowNanos;
152+
153+
long delay = nextTick - nowNanos;
154+
mas.set(schedule(this, delay, TimeUnit.NANOSECONDS));
123155
}
124156
}
125157
};
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import static org.junit.Assert.assertTrue;
19+
20+
import java.util.*;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.junit.Test;
24+
25+
import rx.functions.Action0;
26+
import rx.schedulers.Schedulers;
27+
28+
public class SchedulerWorkerTest {
29+
30+
static final class CustomDriftScheduler extends Scheduler {
31+
public volatile long drift;
32+
@Override
33+
public Worker createWorker() {
34+
final Worker w = Schedulers.computation().createWorker();
35+
return new Worker() {
36+
37+
@Override
38+
public void unsubscribe() {
39+
w.unsubscribe();
40+
}
41+
42+
@Override
43+
public boolean isUnsubscribed() {
44+
return w.isUnsubscribed();
45+
}
46+
47+
@Override
48+
public Subscription schedule(Action0 action) {
49+
return w.schedule(action);
50+
}
51+
52+
@Override
53+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
54+
return w.schedule(action, delayTime, unit);
55+
}
56+
57+
@Override
58+
public long now() {
59+
return super.now() + drift;
60+
}
61+
};
62+
}
63+
64+
@Override
65+
public long now() {
66+
return super.now() + drift;
67+
}
68+
}
69+
70+
@Test
71+
public void testCurrentTimeDriftBackwards() throws Exception {
72+
CustomDriftScheduler s = new CustomDriftScheduler();
73+
74+
Scheduler.Worker w = s.createWorker();
75+
76+
try {
77+
final List<Long> times = new ArrayList<Long>();
78+
79+
Subscription d = w.schedulePeriodically(new Action0() {
80+
@Override
81+
public void call() {
82+
times.add(System.currentTimeMillis());
83+
}
84+
}, 100, 100, TimeUnit.MILLISECONDS);
85+
86+
Thread.sleep(150);
87+
88+
s.drift = -1000 - TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS);
89+
90+
Thread.sleep(400);
91+
92+
d.unsubscribe();
93+
94+
Thread.sleep(150);
95+
96+
System.out.println("Runs: " + times.size());
97+
98+
for (int i = 0; i < times.size() - 1 ; i++) {
99+
long diff = times.get(i + 1) - times.get(i);
100+
System.out.println("Diff #" + i + ": " + diff);
101+
assertTrue("" + i + ":" + diff, diff < 150 && diff > 50);
102+
}
103+
104+
assertTrue("Too few invocations: " + times.size(), times.size() > 2);
105+
106+
} finally {
107+
w.unsubscribe();
108+
}
109+
110+
}
111+
112+
@Test
113+
public void testCurrentTimeDriftForwards() throws Exception {
114+
CustomDriftScheduler s = new CustomDriftScheduler();
115+
116+
Scheduler.Worker w = s.createWorker();
117+
118+
try {
119+
final List<Long> times = new ArrayList<Long>();
120+
121+
Subscription d = w.schedulePeriodically(new Action0() {
122+
@Override
123+
public void call() {
124+
times.add(System.currentTimeMillis());
125+
}
126+
}, 100, 100, TimeUnit.MILLISECONDS);
127+
128+
Thread.sleep(150);
129+
130+
s.drift = 1000 + TimeUnit.NANOSECONDS.toMillis(Scheduler.CLOCK_DRIFT_TOLERANCE_NANOS);
131+
132+
Thread.sleep(400);
133+
134+
d.unsubscribe();
135+
136+
Thread.sleep(150);
137+
138+
System.out.println("Runs: " + times.size());
139+
140+
assertTrue(times.size() > 2);
141+
142+
for (int i = 0; i < times.size() - 1 ; i++) {
143+
long diff = times.get(i + 1) - times.get(i);
144+
System.out.println("Diff #" + i + ": " + diff);
145+
assertTrue("Diff out of range: " + diff, diff < 250 && diff > 50);
146+
}
147+
148+
} finally {
149+
w.unsubscribe();
150+
}
151+
152+
}
153+
}

0 commit comments

Comments
 (0)