Skip to content

Commit e1e927e

Browse files
virajjasanilgh
authored andcommitted
HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalCluster (apache#4028). Contributed by Viraj Jasani.
1 parent eb184f6 commit e1e927e

File tree

3 files changed

+161
-4
lines changed

3 files changed

+161
-4
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,32 @@ public static int getFreeSocketPort() {
10351035
return port;
10361036
}
10371037

1038+
/**
1039+
* Return free ports. There is no guarantee they will remain free, so
1040+
* ports should be used immediately. The number of free ports returned by
1041+
* this method should match argument {@code numOfPorts}. Num of ports
1042+
* provided in the argument should not exceed 25.
1043+
*
1044+
* @param numOfPorts Number of free ports to acquire.
1045+
* @return Free ports for binding a local socket.
1046+
*/
1047+
public static Set<Integer> getFreeSocketPorts(int numOfPorts) {
1048+
Preconditions.checkArgument(numOfPorts > 0 && numOfPorts <= 25,
1049+
"Valid range for num of ports is between 0 and 26");
1050+
final Set<Integer> freePorts = new HashSet<>(numOfPorts);
1051+
for (int i = 0; i < numOfPorts * 5; i++) {
1052+
int port = getFreeSocketPort();
1053+
if (port == 0) {
1054+
continue;
1055+
}
1056+
freePorts.add(port);
1057+
if (freePorts.size() == numOfPorts) {
1058+
return freePorts;
1059+
}
1060+
}
1061+
throw new IllegalStateException(numOfPorts + " free ports could not be acquired.");
1062+
}
1063+
10381064
/**
10391065
* Return an @{@link InetAddress} to bind to. If bindWildCardAddress is true
10401066
* than returns null.

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
2121
import static org.junit.Assert.fail;
2222

23+
import java.io.Closeable;
2324
import java.io.File;
2425
import java.io.IOException;
2526
import java.net.InetSocketAddress;
@@ -44,13 +45,16 @@
4445
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
4546
import org.apache.hadoop.test.GenericTestUtils;
4647

47-
public class MiniJournalCluster {
48+
public final class MiniJournalCluster implements Closeable {
49+
4850
public static final String CLUSTER_WAITACTIVE_URI = "waitactive";
4951
public static class Builder {
5052
private String baseDir;
5153
private int numJournalNodes = 3;
5254
private boolean format = true;
5355
private final Configuration conf;
56+
private int[] httpPorts = null;
57+
private int[] rpcPorts = null;
5458

5559
static {
5660
DefaultMetricsSystem.setMiniClusterMode(true);
@@ -75,6 +79,16 @@ public Builder format(boolean f) {
7579
return this;
7680
}
7781

82+
public Builder setHttpPorts(int... ports) {
83+
this.httpPorts = ports;
84+
return this;
85+
}
86+
87+
public Builder setRpcPorts(int... ports) {
88+
this.rpcPorts = ports;
89+
return this;
90+
}
91+
7892
public MiniJournalCluster build() throws IOException {
7993
return new MiniJournalCluster(this);
8094
}
@@ -98,6 +112,19 @@ private JNInfo(JournalNode node) {
98112
private final JNInfo[] nodes;
99113

100114
private MiniJournalCluster(Builder b) throws IOException {
115+
116+
if (b.httpPorts != null && b.httpPorts.length != b.numJournalNodes) {
117+
throw new IllegalArgumentException(
118+
"Num of http ports (" + b.httpPorts.length + ") should match num of JournalNodes ("
119+
+ b.numJournalNodes + ")");
120+
}
121+
122+
if (b.rpcPorts != null && b.rpcPorts.length != b.numJournalNodes) {
123+
throw new IllegalArgumentException(
124+
"Num of rpc ports (" + b.rpcPorts.length + ") should match num of JournalNodes ("
125+
+ b.numJournalNodes + ")");
126+
}
127+
101128
LOG.info("Starting MiniJournalCluster with " +
102129
b.numJournalNodes + " journal nodes");
103130

@@ -172,8 +199,10 @@ private Configuration createConfForNode(Builder b, int idx) {
172199
Configuration conf = new Configuration(b.conf);
173200
File logDir = getStorageDir(idx);
174201
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
175-
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:0");
176-
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:0");
202+
int httpPort = b.httpPorts != null ? b.httpPorts[idx] : 0;
203+
int rpcPort = b.rpcPorts != null ? b.rpcPorts[idx] : 0;
204+
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:" + rpcPort);
205+
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:" + httpPort);
177206
return conf;
178207
}
179208

@@ -273,4 +302,10 @@ public void setNamenodeSharedEditsConf(String jid) {
273302
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
274303
}
275304
}
305+
306+
@Override
307+
public void close() throws IOException {
308+
this.shutdown();
309+
}
310+
276311
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,23 @@
2222
import java.io.File;
2323
import java.io.IOException;
2424
import java.net.URI;
25+
import java.util.Set;
2526

2627
import org.apache.hadoop.conf.Configuration;
2728
import org.apache.hadoop.hdfs.DFSConfigKeys;
2829
import org.apache.hadoop.hdfs.MiniDFSCluster;
2930
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
30-
import org.junit.Test;
31+
import org.apache.hadoop.net.NetUtils;
32+
import org.apache.hadoop.test.LambdaTestUtils;
3133

34+
import org.junit.Test;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
3237

3338
public class TestMiniJournalCluster {
39+
40+
private static final Logger LOG = LoggerFactory.getLogger(TestMiniJournalCluster.class);
41+
3442
@Test
3543
public void testStartStop() throws IOException {
3644
Configuration conf = new Configuration();
@@ -52,4 +60,92 @@ public void testStartStop() throws IOException {
5260
c.shutdown();
5361
}
5462
}
63+
64+
@Test
65+
public void testStartStopWithPorts() throws Exception {
66+
Configuration conf = new Configuration();
67+
68+
LambdaTestUtils.intercept(
69+
IllegalArgumentException.class,
70+
"Num of http ports (1) should match num of JournalNodes (3)",
71+
"MiniJournalCluster port validation failed",
72+
() -> {
73+
new MiniJournalCluster.Builder(conf).setHttpPorts(8481).build();
74+
});
75+
76+
LambdaTestUtils.intercept(
77+
IllegalArgumentException.class,
78+
"Num of rpc ports (2) should match num of JournalNodes (3)",
79+
"MiniJournalCluster port validation failed",
80+
() -> {
81+
new MiniJournalCluster.Builder(conf).setRpcPorts(8481, 8482).build();
82+
});
83+
84+
LambdaTestUtils.intercept(
85+
IllegalArgumentException.class,
86+
"Num of rpc ports (1) should match num of JournalNodes (3)",
87+
"MiniJournalCluster port validation failed",
88+
() -> {
89+
new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 10000).setRpcPorts(8481)
90+
.build();
91+
});
92+
93+
LambdaTestUtils.intercept(
94+
IllegalArgumentException.class,
95+
"Num of http ports (4) should match num of JournalNodes (3)",
96+
"MiniJournalCluster port validation failed",
97+
() -> {
98+
new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 1000, 2000)
99+
.setRpcPorts(8481, 8482, 8483).build();
100+
});
101+
102+
final Set<Integer> httpAndRpcPorts = NetUtils.getFreeSocketPorts(6);
103+
LOG.info("Free socket ports: {}", httpAndRpcPorts);
104+
105+
for (Integer httpAndRpcPort : httpAndRpcPorts) {
106+
assertNotEquals("None of the acquired socket port should not be zero", 0,
107+
httpAndRpcPort.intValue());
108+
}
109+
110+
final int[] httpPorts = new int[3];
111+
final int[] rpcPorts = new int[3];
112+
int httpPortIdx = 0;
113+
int rpcPortIdx = 0;
114+
for (Integer httpAndRpcPort : httpAndRpcPorts) {
115+
if (httpPortIdx < 3) {
116+
httpPorts[httpPortIdx++] = httpAndRpcPort;
117+
} else {
118+
rpcPorts[rpcPortIdx++] = httpAndRpcPort;
119+
}
120+
}
121+
122+
LOG.info("Http ports selected: {}", httpPorts);
123+
LOG.info("Rpc ports selected: {}", rpcPorts);
124+
125+
try (MiniJournalCluster miniJournalCluster = new MiniJournalCluster.Builder(conf)
126+
.setHttpPorts(httpPorts)
127+
.setRpcPorts(rpcPorts).build()) {
128+
miniJournalCluster.waitActive();
129+
URI uri = miniJournalCluster.getQuorumJournalURI("myjournal");
130+
String[] addrs = uri.getAuthority().split(";");
131+
assertEquals(3, addrs.length);
132+
133+
assertEquals(httpPorts[0], miniJournalCluster.getJournalNode(0).getHttpAddress().getPort());
134+
assertEquals(httpPorts[1], miniJournalCluster.getJournalNode(1).getHttpAddress().getPort());
135+
assertEquals(httpPorts[2], miniJournalCluster.getJournalNode(2).getHttpAddress().getPort());
136+
137+
assertEquals(rpcPorts[0],
138+
miniJournalCluster.getJournalNode(0).getRpcServer().getAddress().getPort());
139+
assertEquals(rpcPorts[1],
140+
miniJournalCluster.getJournalNode(1).getRpcServer().getAddress().getPort());
141+
assertEquals(rpcPorts[2],
142+
miniJournalCluster.getJournalNode(2).getRpcServer().getAddress().getPort());
143+
144+
JournalNode node = miniJournalCluster.getJournalNode(0);
145+
String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
146+
assertEquals(new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0").getAbsolutePath(),
147+
dir);
148+
}
149+
}
150+
55151
}

0 commit comments

Comments
 (0)