Skip to content

Commit b058459

Browse files
Xinyu Liuxinyuiscool
authored andcommitted
SAMZA-1707: Samza onTimer method triggering before init
Currently there was a bug when registering a timer with a very short amount of delay, it might not be invoked since it depends on the creation of the run loop. This patch fixed the problem by double checking the ready timers when run loop is created (listener is registered.) Author: xinyuiscool <[email protected]> Reviewers: Prateek M <[email protected]> Closes apache#810 from xinyuiscool/SAMZA-1707
1 parent ebeccd2 commit b058459

File tree

2 files changed

+98
-1
lines changed

2 files changed

+98
-1
lines changed

samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public interface TimerListener {
4848
private final ScheduledExecutorService executor;
4949
private final Map<Object, ScheduledFuture> scheduledFutures = new ConcurrentHashMap<>();
5050
private final Map<TimerKey<?>, ScheduledCallback> readyTimers = new ConcurrentHashMap<>();
51-
private TimerListener timerListener;
51+
private volatile TimerListener timerListener;
5252

5353
public static EpochTimeScheduler create(ScheduledExecutorService executor) {
5454
return new EpochTimeScheduler(executor);
@@ -83,6 +83,10 @@ public <K> void deleteTimer(K key) {
8383

8484
void registerListener(TimerListener listener) {
8585
timerListener = listener;
86+
87+
if (!readyTimers.isEmpty()) {
88+
timerListener.onTimer();
89+
}
8690
}
8791

8892
public Map<TimerKey<?>, ScheduledCallback> removeReadyTimers() {
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.test.functions;
21+
22+
import org.apache.samza.application.StreamApplication;
23+
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
24+
import org.apache.samza.operators.Scheduler;
25+
import org.apache.samza.operators.functions.MapFunction;
26+
import org.apache.samza.operators.functions.ScheduledFunction;
27+
import org.apache.samza.serializers.IntegerSerde;
28+
import org.apache.samza.test.framework.TestRunner;
29+
import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
30+
import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
31+
import org.junit.Test;
32+
33+
import java.time.Duration;
34+
import java.util.Arrays;
35+
import java.util.Collection;
36+
import java.util.Collections;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
39+
import static org.junit.Assert.assertFalse;
40+
import static org.junit.Assert.assertTrue;
41+
42+
public class TestSchedulerFunction {
43+
static AtomicBoolean timerFired = new AtomicBoolean(false);
44+
45+
@Test
46+
public void testImmediateTimer() {
47+
final InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
48+
final InMemoryInputDescriptor<Integer> imid = isd.getInputDescriptor("test-input", new IntegerSerde());
49+
50+
StreamApplication app = new StreamApplication() {
51+
@Override
52+
public void describe(StreamApplicationDescriptor appDescriptor) {
53+
54+
appDescriptor.getInputStream(imid)
55+
.map(new TestFunction());
56+
}
57+
};
58+
59+
TestRunner
60+
.of(app)
61+
.addInputStream(imid, Arrays.asList(1, 2, 3, 4, 5))
62+
.run(Duration.ofSeconds(1));
63+
64+
assertTrue(timerFired.get());
65+
}
66+
67+
private static class TestFunction implements MapFunction<Integer, Void>, ScheduledFunction<String, Void> {
68+
69+
@Override
70+
public Void apply(Integer message) {
71+
try {
72+
// This is to avoid the container shutdown before the timer fired
73+
Thread.sleep(1);
74+
} catch (Exception e) {
75+
}
76+
return null;
77+
}
78+
79+
@Override
80+
public void schedule(Scheduler<String> scheduler) {
81+
scheduler.schedule("haha", System.currentTimeMillis());
82+
}
83+
84+
@Override
85+
public Collection<Void> onCallback(String key, long timestamp) {
86+
assertFalse(timerFired.get());
87+
88+
timerFired.compareAndSet(false, true);
89+
90+
return Collections.emptyList();
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)