Skip to content

Commit f1bc1d0

Browse files
sboryanavina
authored andcommitted
SAMZA-1102: Zk controller
SAMZA-1102: Added ZKController and ZkControllerImpl Author: Boris Shkolnik <[email protected]> Author: navina <[email protected]> Reviewers: Navina Ramesh <[email protected]>, Fred Ji <[email protected]>, Xinyu Liu <[email protected]> Closes apache#50 from sborya/ZkController
1 parent e6147fd commit f1bc1d0

File tree

11 files changed

+549
-59
lines changed

11 files changed

+549
-59
lines changed

samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class SamzaContainerController {
6060
* @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
6161
* {@link org.apache.samza.task.AsyncStreamTask}
6262
* @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
63+
* @param processorId Id of the processor
6364
* @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance
6465
*/
6566
public SamzaContainerController(

samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,14 @@ public StreamProcessor(int processorId, Config config, Map<String, MetricsReport
9494
this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory);
9595
}
9696

97+
9798
/**
98-
* Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
99+
*Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
99100
* using the provided {@link StreamTaskFactory}.
101+
* @param processorId - this processor Id
102+
* @param config - config
103+
* @param customMetricsReporters metric Reporter
104+
* @param streamTaskFactory task factory to instantiate the Task
100105
*/
101106
public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
102107
StreamTaskFactory streamTaskFactory) {
@@ -106,6 +111,9 @@ public StreamProcessor(int processorId, Config config, Map<String, MetricsReport
106111
/**
107112
* Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
108113
* using the "task.class" configuration instead of a task factory.
114+
* @param processorId - this processor Id
115+
* @param config - config
116+
* @param customMetricsReporters metrics
109117
*/
110118
public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) {
111119
this(processorId, config, customMetricsReporters, (Object) null);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
21+
package org.apache.samza.zk;
22+
23+
/**
24+
* Api to the functionality provided by ZK
25+
*/
26+
public interface ZkController {
27+
void register();
28+
boolean isLeader();
29+
void notifyJobModelChange(String version);
30+
void stop();
31+
void listenToProcessorLiveness();
32+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.zk;
21+
22+
import org.I0Itec.zkclient.IZkChildListener;
23+
import org.I0Itec.zkclient.IZkDataListener;
24+
import org.apache.samza.SamzaException;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.util.List;
29+
30+
31+
public class ZkControllerImpl implements ZkController {
32+
private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class);
33+
34+
private final String processorIdStr;
35+
private final ZkUtils zkUtils;
36+
private final ZkControllerListener zkControllerListener;
37+
private final ZkLeaderElector leaderElector;
38+
private final ScheduleAfterDebounceTime debounceTimer;
39+
40+
public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer,
41+
ZkControllerListener zkControllerListener) {
42+
this.processorIdStr = processorIdStr;
43+
this.zkUtils = zkUtils;
44+
this.zkControllerListener = zkControllerListener;
45+
this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils,
46+
new ZkLeaderElector.ZkLeaderElectorListener() {
47+
@Override
48+
public void onBecomingLeader() {
49+
onBecomeLeader();
50+
}
51+
}
52+
);
53+
this.debounceTimer = debounceTimer;
54+
55+
init();
56+
}
57+
58+
private void init() {
59+
ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
60+
zkUtils.makeSurePersistentPathsExists(
61+
new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
62+
.getJobModelPathPrefix()});
63+
}
64+
65+
private void onBecomeLeader() {
66+
67+
listenToProcessorLiveness(); // subscribe for adding new processors
68+
69+
// inform the caller
70+
zkControllerListener.onBecomeLeader();
71+
72+
}
73+
74+
@Override
75+
public void register() {
76+
77+
// TODO - make a loop here with some number of attempts.
78+
// possibly split into two method - becomeLeader() and becomeParticipant()
79+
leaderElector.tryBecomeLeader();
80+
81+
// subscribe to JobModel version updates
82+
zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer));
83+
}
84+
85+
@Override
86+
public boolean isLeader() {
87+
return leaderElector.amILeader();
88+
}
89+
90+
@Override
91+
public void notifyJobModelChange(String version) {
92+
zkControllerListener.onNewJobModelAvailable(version);
93+
}
94+
95+
@Override
96+
public void stop() {
97+
if (isLeader()) {
98+
leaderElector.resignLeadership();
99+
}
100+
zkUtils.close();
101+
}
102+
103+
@Override
104+
public void listenToProcessorLiveness() {
105+
zkUtils.subscribeToProcessorChange(new ZkProcessorChangeHandler(debounceTimer));
106+
}
107+
108+
// Only by Leader
109+
class ZkProcessorChangeHandler implements IZkChildListener {
110+
private final ScheduleAfterDebounceTime debounceTimer;
111+
public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
112+
this.debounceTimer = debounceTimer;
113+
}
114+
/**
115+
* Called when the children of the given path changed.
116+
*
117+
* @param parentPath The parent path
118+
* @param currentChilds The children or null if the root node (parent path) was deleted.
119+
* @throws Exception
120+
*/
121+
@Override
122+
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
123+
LOG.info(
124+
"ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + " Current Children: "
125+
+ currentChilds);
126+
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
127+
ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChilds));
128+
}
129+
}
130+
131+
class ZkJobModelVersionChangeHandler implements IZkDataListener {
132+
private final ScheduleAfterDebounceTime debounceTimer;
133+
public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
134+
this.debounceTimer = debounceTimer;
135+
}
136+
/**
137+
* called when job model version gets updated
138+
* @param dataPath
139+
* @param data
140+
* @throws Exception
141+
*/
142+
@Override
143+
public void handleDataChange(String dataPath, Object data) throws Exception {
144+
LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data="
145+
+ (String) data);
146+
147+
debounceTimer
148+
.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data));
149+
}
150+
@Override
151+
public void handleDataDeleted(String dataPath) throws Exception {
152+
throw new SamzaException("version update path has been deleted!");
153+
}
154+
}
155+
156+
public void shutdown() {
157+
if (debounceTimer != null)
158+
debounceTimer.stopScheduler();
159+
160+
if (zkUtils != null)
161+
zkUtils.close();
162+
}
163+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.zk;
21+
22+
import java.util.List;
23+
24+
25+
/**
26+
* callbacks to the caller of the ZkController
27+
*/
28+
public interface ZkControllerListener {
29+
void onBecomeLeader();
30+
void onProcessorChange(List<String> processorIds);
31+
32+
void onNewJobModelAvailable(String version); // start job model update (stop current work)
33+
void onNewJobModelConfirmed(String version); // start new work according to the new model
34+
}

samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
package org.apache.samza.zk;
2121

22-
import com.google.common.base.Strings;
2322
import org.apache.samza.SamzaException;
23+
import com.google.common.base.Strings;
2424

2525
/**
2626
* The following ZK hierarchy is maintained for Standalone jobs:
@@ -44,7 +44,7 @@ public class ZkKeyBuilder {
4444
private final String pathPrefix;
4545

4646
static final String PROCESSORS_PATH = "processors";
47-
static final String PROCESSOR_ID_PREFIX = "processor-";
47+
public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
4848

4949
public ZkKeyBuilder(String pathPrefix) {
5050
if (Strings.isNullOrEmpty(pathPrefix)) {
@@ -53,6 +53,10 @@ public ZkKeyBuilder(String pathPrefix) {
5353
this.pathPrefix = pathPrefix.trim();
5454
}
5555

56+
public String getRootPath() {
57+
return "/" + pathPrefix;
58+
}
59+
5660
public String getProcessorsPath() {
5761
return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
5862
}
@@ -71,4 +75,18 @@ public static String parseIdFromPath(String path) {
7175
return path.substring(path.lastIndexOf("/") + 1);
7276
return null;
7377
}
78+
79+
public String getJobModelVersionPath() {
80+
return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH);
81+
}
82+
83+
public String getJobModelPathPrefix() {
84+
return String.format("/%s/jobModels", pathPrefix);
85+
}
86+
87+
public String getJobModelPath(String jobModelVersion) {
88+
return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
89+
}
90+
91+
7492
}

samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,30 @@ public class ZkLeaderElector implements LeaderElector {
5050
private final String hostName;
5151

5252
private AtomicBoolean isLeader = new AtomicBoolean(false);
53-
private final IZkDataListener zkLeaderElectionListener;
53+
private final IZkDataListener previousProcessorChangeListener;
54+
ZkLeaderElectorListener zkLeaderElectorListener;
5455
private String currentSubscription = null;
5556
private final Random random = new Random();
5657

5758
@VisibleForTesting
58-
ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, IZkDataListener leaderElectionListener) {
59+
ZkLeaderElector(String processorIdStr,
60+
ZkUtils zkUtils,
61+
ZkLeaderElectorListener zkLeaderElectorListener,
62+
IZkDataListener previousProcessorChangeListener) {
5963
this.processorIdStr = processorIdStr;
6064
this.zkUtils = zkUtils;
61-
this.zkLeaderElectionListener = leaderElectionListener;
6265
this.keyBuilder = this.zkUtils.getKeyBuilder();
6366
this.hostName = getHostName();
67+
this.zkLeaderElectorListener = zkLeaderElectorListener; // listener to inform the caller that they have become the leader
68+
if (previousProcessorChangeListener == null)
69+
this.previousProcessorChangeListener = new PreviousProcessorChangeListener();
70+
else
71+
this.previousProcessorChangeListener = previousProcessorChangeListener;
6472
}
6573

66-
public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
67-
this.zkLeaderElectionListener = new ZkLeaderElectionListener();
68-
this.processorIdStr = processorIdStr;
69-
this.zkUtils = zkUtils;
70-
this.keyBuilder = this.zkUtils.getKeyBuilder();
71-
this.hostName = getHostName();
74+
public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkLeaderElectorListener zkLeaderElectorListener) {
75+
this(processorIdStr, zkUtils, zkLeaderElectorListener, null);
76+
7277
}
7378

7479
// TODO: This should go away once we integrate with Zk based Job Coordinator
@@ -81,6 +86,10 @@ private String getHostName() {
8186
}
8287
}
8388

89+
public interface ZkLeaderElectorListener {
90+
void onBecomingLeader();
91+
}
92+
8493
@Override
8594
public boolean tryBecomeLeader() {
8695
String currentPath = zkUtils.registerProcessorAndGetId(hostName);
@@ -96,6 +105,7 @@ public boolean tryBecomeLeader() {
96105
if (index == 0) {
97106
isLeader.getAndSet(true);
98107
LOGGER.info(zLog("Eligible to become the leader!"));
108+
zkLeaderElectorListener.onBecomingLeader(); // inform the caller
99109
return true;
100110
}
101111

@@ -105,11 +115,13 @@ public boolean tryBecomeLeader() {
105115
if (!predecessor.equals(currentSubscription)) {
106116
if (currentSubscription != null) {
107117
LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription));
108-
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
118+
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
119+
previousProcessorChangeListener);
109120
}
110121
currentSubscription = predecessor;
111122
LOGGER.info(zLog("Subscribing data change for " + predecessor));
112-
zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
123+
zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
124+
previousProcessorChangeListener);
113125
}
114126
/**
115127
* Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
@@ -146,7 +158,7 @@ private String zLog(String logMessage) {
146158
}
147159

148160
// Only by non-leaders
149-
class ZkLeaderElectionListener implements IZkDataListener {
161+
class PreviousProcessorChangeListener implements IZkDataListener {
150162

151163
@Override
152164
public void handleDataChange(String dataPath, Object data) throws Exception {

0 commit comments

Comments
 (0)