Skip to content

Commit 9dfdc76

Browse files
author
胡贵
committed
增加redis 注册中心实现
1 parent daa489c commit 9dfdc76

File tree

21 files changed

+366
-136
lines changed

21 files changed

+366
-136
lines changed

job-admin/src/main/java/com/lts/job/web/support/node/ZkNodeManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public Map<NodeType, List<Node>> getAllNodes(String clusterName) {
3535
if (CollectionUtils.isEmpty(nodes)) {
3636
List<Node> nodeList = new ArrayList<Node>(nodes.size());
3737
for (String node : nodes) {
38-
nodeList.add(NodeRegistryUtils.parse(clusterName, node));
38+
nodeList.add(NodeRegistryUtils.parse(node));
3939
}
4040
nodeMap.put(NodeType.valueOf(nodeType), nodeList);
4141
}

job-client/src/main/java/com/lts/job/client/RetryJobClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* @author Robert HG ([email protected]) on 8/14/14.
1515
* 重试 客户端, 如果 没有可用的JobTracker, 那么存文件, 定时重试
1616
*/
17-
public class RetryJobClient extends JobClient<JobClientNode, JobClientApplication>{
17+
public class RetryJobClient extends JobClient<JobClientNode, JobClientApplication> {
1818

1919
private RetryScheduler retryScheduler;
2020

job-client/src/main/java/com/lts/job/client/domain/JobClientNode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
public class JobClientNode extends Node {
1111

1212
public JobClientNode() {
13-
this.setNodeType(NodeType.CLIENT);
13+
this.setNodeType(NodeType.JOB_CLIENT);
1414
this.addListenNodeType(NodeType.JOB_TRACKER);
15-
this.addListenNodeType(NodeType.CLIENT);
15+
this.addListenNodeType(NodeType.JOB_CLIENT);
1616
}
1717

1818
}

job-client/src/main/java/com/lts/job/client/domain/ResponseCode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ public class ResponseCode {
99
private ResponseCode(){}
1010

1111
// 没有找到 JobTracker 节点
12-
public static final String JOB_TRACKER_NOT_FOUND = "JOB_TRACKER_NOT_FOUND";
12+
public static final String JOB_TRACKER_NOT_FOUND = "11";
1313

1414
// 提交失败并且写入文件
15-
public static final String FAILED_AND_SAVE_FILE = "FAILED_AND_SAVE_FILE";
15+
public static final String FAILED_AND_SAVE_FILE = "12";
1616

1717
// 请求参数检查失败
18-
public static final String REQUEST_FILED_CHECK_ERROR = "REQUEST_FILED_CHECK_ERROR";
18+
public static final String REQUEST_FILED_CHECK_ERROR = "13";
1919

2020
}

job-core/src/main/java/com/lts/job/core/cluster/AbstractJobNode.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ final public void start() {
5555
node = NodeFactory.create(getNodeClass(), config);
5656
config.setNodeType(node.getNodeType());
5757

58-
initRegistry();
59-
6058
LOGGER.info("当前节点配置:{}", config);
6159

6260
// 监听节点 启用/禁用消息
@@ -75,6 +73,8 @@ public void onObserved(EventInfo eventInfo) {
7573

7674
innerStart();
7775

76+
initRegistry();
77+
7878
registry.register(node);
7979

8080
LOGGER.info("启动成功!");
@@ -107,7 +107,9 @@ public void destroy() {
107107

108108
private void initRegistry() {
109109
registry = RegistryFactory.getRegistry(application);
110-
((AbstractRegistry) registry).setNode(node);
110+
if (registry instanceof AbstractRegistry) {
111+
((AbstractRegistry) registry).setNode(node);
112+
}
111113
// 订阅的node管理
112114
SubscribedNodeManager subscribedNodeManager = new SubscribedNodeManager(application);
113115
application.setSubscribedNodeManager(subscribedNodeManager);

job-core/src/main/java/com/lts/job/core/cluster/Node.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,14 @@ public String getAddress() {
127127
@Override
128128
public String toString() {
129129
return "Node{" +
130-
", isAvailable=" + isAvailable +
130+
"identity='" + identity + '\'' +
131131
", nodeType=" + nodeType +
132132
", ip='" + ip + '\'' +
133133
", port=" + port +
134134
", group='" + group + '\'' +
135135
", createTime=" + createTime +
136136
", threads=" + threads +
137-
", identity='" + identity + '\'' +
137+
", isAvailable=" + isAvailable +
138138
", listenNodeTypes=" + listenNodeTypes +
139139
'}';
140140
}

job-core/src/main/java/com/lts/job/core/cluster/NodeType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ public enum NodeType {
1010
// task tracker
1111
TASK_TRACKER,
1212
// client
13-
CLIENT
13+
JOB_CLIENT
1414
}

job-core/src/main/java/com/lts/job/core/constant/Constants.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,12 @@ public interface Constants {
5151
public static final int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
5252

5353
public static final Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*");
54+
55+
/**
56+
* 注册中心自动重连时间
57+
*/
58+
public static final String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period";
59+
60+
public static final int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000;
61+
5462
}

job-core/src/main/java/com/lts/job/core/registry/NodeRegistryUtils.java

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22

33
import com.lts.job.core.cluster.Node;
44
import com.lts.job.core.cluster.NodeType;
5+
import com.lts.job.core.util.NetUtils;
6+
import com.lts.job.core.util.StringUtils;
57

6-
import java.util.regex.Matcher;
7-
import java.util.regex.Pattern;
8+
import java.util.Date;
89

910
/**
1011
* @author Robert HG ([email protected]) on 5/11/15.
1112
* <p/>
13+
* /LTS/{集群名字}/NODES/TASK_TRACKER/TASK_TRACKER:\\192.168.0.150:8888?group=TASK_TRACKER&threads=8&identity=85750db6-e854-4eb3-a595-9227a5f2c8f6&createTime=1408189898185&isAvailable=true&listenNodeTypes=CLIENT,TASK_TRACKER
14+
* /LTS/{集群名字}/NODES/JOB_CLIENT/JOB_CLIENT:\\192.168.0.150:8888?group=JOB_CLIENT&threads=8&identity=85750db6-e854-4eb3-a595-9227a5f2c8f6&createTime=1408189898185&isAvailable=true&listenNodeTypes=CLIENT,TASK_TRACKER
1215
* /LTS/{集群名字}/NODES/JOB_TRACKER/JOB_TRACKER:\\192.168.0.150:8888?group=JOB_TRACKER&threads=8&identity=85750db6-e854-4eb3-a595-9227a5f2c8f6&createTime=1408189898185&isAvailable=true&listenNodeTypes=CLIENT,TASK_TRACKER
1316
* <p/>
1417
*/
@@ -22,13 +25,14 @@ public static String getNodeTypePath(String clusterName, NodeType nodeType) {
2225
return NodeRegistryUtils.getRootPath(clusterName) + "/" + nodeType;
2326
}
2427

25-
public static Node parse(String clusterName, String fullPath) {
28+
public static Node parse(String fullPath) {
2629
Node node = new Node();
27-
String nodeType = getMatcher(getRootPath(clusterName) + "/(.*)/", fullPath);
28-
node.setNodeType(NodeType.valueOf(nodeType));
29-
30-
String url = getMatcher(getRootPath(clusterName) + "/" + nodeType + "/" + nodeType + ":\\\\\\\\(.*)", fullPath);
30+
String[] nodeDir = fullPath.split("/");
31+
NodeType nodeType = NodeType.valueOf(nodeDir[4]);
32+
node.setNodeType(nodeType);
33+
String url = nodeDir[5];
3134

35+
url = url.substring(nodeType.name().length() + 3);
3236
String address = url.split("\\?")[0];
3337
String ip = address.split(":")[0];
3438

@@ -56,18 +60,14 @@ public static Node parse(String clusterName, String fullPath) {
5660
node.setCreateTime(Long.valueOf(value));
5761
} else if ("isAvailable".equals(key)) {
5862
node.setAvailable(Boolean.valueOf(value));
59-
} else if ("listenNodeTypes".equals(key)) {
60-
String[] nodeTypes = value.split(",");
61-
for (String type : nodeTypes) {
62-
node.addListenNodeType(NodeType.valueOf(type));
63-
}
6463
}
6564
}
6665
return node;
6766
}
6867

6968
public static String getFullPath(String clusterName, Node node) {
7069
StringBuilder path = new StringBuilder();
70+
7171
path.append(getRootPath(clusterName))
7272
.append("/")
7373
.append(node.getNodeType())
@@ -95,22 +95,27 @@ public static String getFullPath(String clusterName, Node node) {
9595
.append("&isAvailable=")
9696
.append(node.isAvailable());
9797

98-
if (node.getListenNodeTypes() != null && node.getListenNodeTypes().size() != 0) {
99-
path.append("&listenNodeTypes=");
100-
for (NodeType nodeType : node.getListenNodeTypes()) {
101-
path.append(nodeType).append(",");
102-
}
103-
path.deleteCharAt(path.length() - 1);
104-
}
10598
return path.toString();
10699
}
107100

108-
private static String getMatcher(String regex, String source) {
109-
Pattern pattern = Pattern.compile(regex);
110-
Matcher matcher = pattern.matcher(source);
111-
while (matcher.find()) {
112-
return matcher.group(1);//只取第一组
113-
}
114-
return "";
101+
public static void main(String[] args) {
102+
Node node = new Node();
103+
node.setGroup("group1");
104+
node.setIdentity(StringUtils.generateUUID());
105+
node.setThreads(222);
106+
node.setNodeType(NodeType.JOB_TRACKER);
107+
node.setCreateTime(new Date().getTime());
108+
node.setPort(2313);
109+
node.setIp(NetUtils.getLocalHost());
110+
String fullPath = NodeRegistryUtils.getFullPath("lts", node);
111+
System.out.println(fullPath);
112+
113+
node = NodeRegistryUtils.parse(fullPath);
114+
node.setNodeType(NodeType.JOB_CLIENT);
115+
fullPath = NodeRegistryUtils.getFullPath("lts", node);
116+
System.out.println(fullPath);
117+
118+
node = NodeRegistryUtils.parse(fullPath);
119+
System.out.println(node);
115120
}
116121
}

job-core/src/main/java/com/lts/job/core/registry/RegistryFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@ public static Registry getRegistry(Application application) {
1616
throw new IllegalArgumentException("address is null!");
1717
}
1818
if (address.startsWith("zookeeper://")) {
19+
application.getConfig().setRegistryAddress(
20+
address.replace("zookeeper://", "")
21+
);
1922
return new ZookeeperRegistry(application);
2023
} else if (address.startsWith("redis://")) {
24+
application.getConfig().setRegistryAddress(
25+
address.replace("redis://", "")
26+
);
2127
return new RedisRegistry(application);
2228
}
2329
throw new IllegalArgumentException("illegal address protocol");

0 commit comments

Comments
 (0)