Skip to content

Commit 09bf833

Browse files
sboryanavina
authored andcommitted
SAMZA-1087: Schedule after debounce time
SAMZA-1087: Allows scheduling an action (a Runnable) after some de-bounce delay. Author: Boris Shkolnik <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Navina Ramesh <[email protected]>, Fred Ji <[email protected]> Closes apache#49 from sborya/ScheduleAfterDebounceTime1
1 parent 2226e3e commit 09bf833

File tree

2 files changed

+196
-0
lines changed

2 files changed

+196
-0
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.zk;
21+
22+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
35+
/**
36+
* This class allows scheduling a Runnable actions after some debounce time.
37+
* When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous
38+
* future in a map, keyed by the action name. Here we predefine some actions, which are used in the
39+
* ZK based standalone app.
40+
*/
41+
public class ScheduleAfterDebounceTime {
42+
public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
43+
public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
44+
45+
// Action name when the JobModel version changes
46+
public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange";
47+
48+
// Action name when the Processor membership changes
49+
public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange";
50+
51+
// Action name when the Processor Data changes
52+
public static final String ON_DATA_CHANGE_ON = "OnDataChangeOn";
53+
54+
public static final int DEBOUNCE_TIME_MS = 2000;
55+
56+
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
57+
new ThreadFactoryBuilder().setNameFormat("zk-debounce-thread-%d").setDaemon(true).build());
58+
private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
59+
60+
synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) {
61+
// check if this action has been scheduled already
62+
ScheduledFuture sf = futureHandles.get(actionName);
63+
if (sf != null && !sf.isDone()) {
64+
LOG.info("DEBOUNCE: cancel future for " + actionName);
65+
// attempt to cancel
66+
if (!sf.cancel(false)) {
67+
try {
68+
sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
69+
} catch (Exception e) {
70+
// we ignore the exception
71+
LOG.warn("cancel for action " + actionName + " failed with ", e);
72+
}
73+
}
74+
futureHandles.remove(actionName);
75+
}
76+
// schedule a new task
77+
sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, TimeUnit.MILLISECONDS);
78+
LOG.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs);
79+
futureHandles.put(actionName, sf);
80+
}
81+
82+
public void stopScheduler() {
83+
// shutdown executor service
84+
scheduledExecutorService.shutdown();
85+
}
86+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.zk;
21+
22+
import org.junit.Assert;
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
26+
27+
public class TestScheduleAfterDebounceTime {
28+
private static final long DEBOUNCE_TIME = 500;
29+
int i = 0;
30+
@Before
31+
public void setup() {
32+
33+
}
34+
35+
class TestObj {
36+
public void inc() {
37+
i++;
38+
}
39+
public void setTo(int val) {
40+
i = val;
41+
}
42+
public void doNothing() {
43+
44+
}
45+
}
46+
@Test
47+
public void testSchedule() {
48+
ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
49+
50+
final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
51+
debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
52+
{
53+
testObj.inc();
54+
}
55+
);
56+
// action is delayed
57+
Assert.assertEquals(0, i);
58+
59+
try {
60+
Thread.sleep(DEBOUNCE_TIME + 10);
61+
} catch (InterruptedException e) {
62+
Assert.fail("Sleep was interrupted");
63+
}
64+
65+
// debounce time passed
66+
Assert.assertEquals(1, i);
67+
}
68+
69+
@Test
70+
public void testCancelAndSchedule() {
71+
ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
72+
73+
final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
74+
debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
75+
{
76+
testObj.inc();
77+
}
78+
);
79+
Assert.assertEquals(0, i);
80+
81+
// next schedule should cancel the previous one with the same name
82+
debounceTimer.scheduleAfterDebounceTime("TEST1", 2 * DEBOUNCE_TIME, () ->
83+
{
84+
testObj.setTo(100);
85+
}
86+
);
87+
88+
try {
89+
Thread.sleep(DEBOUNCE_TIME + 10);
90+
} catch (InterruptedException e) {
91+
Assert.fail("Sleep was interrupted");
92+
}
93+
// still should be the old value
94+
Assert.assertEquals(0, i);
95+
96+
// this schedule should not cancel the previous one, because it has different name
97+
debounceTimer.scheduleAfterDebounceTime("TEST2", DEBOUNCE_TIME, () ->
98+
{
99+
testObj.doNothing();
100+
}
101+
);
102+
103+
try {
104+
Thread.sleep(3 * DEBOUNCE_TIME + 10);
105+
} catch (InterruptedException e) {
106+
Assert.fail("Sleep was interrupted");
107+
}
108+
Assert.assertEquals(100, i);
109+
}
110+
}

0 commit comments

Comments
 (0)