Skip to content

Commit ce7b8b5

Browse files
author
Chen Liang
committed
HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang.
1 parent 10a60fb commit ce7b8b5

File tree

5 files changed

+174
-35
lines changed

5 files changed

+174
-35
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -382,20 +382,28 @@ public static InetAddress getRemoteIp() {
382382

383383
/**
384384
* Returns the SASL qop for the current call, if the current call is
385-
* set, and the SASL negotiation is done. Otherwise return null. Note
386-
* that CurCall is thread local object. So in fact, different handler
387-
* threads will process different CurCall object.
385+
* set, and the SASL negotiation is done. Otherwise return null
386+
* Note this only returns established QOP for auxiliary port, and
387+
* returns null for primary (non-auxiliary) port.
388+
*
389+
* Also note that CurCall is thread local object. So in fact, different
390+
* handler threads will process different CurCall object.
388391
*
389392
* Also, only return for RPC calls, not supported for other protocols.
390393
* @return the QOP of the current connection.
391394
*/
392-
public static String getEstablishedQOP() {
395+
public static String getAuxiliaryPortEstablishedQOP() {
393396
Call call = CurCall.get();
394-
if (call == null || !(call instanceof RpcCall)) {
397+
if (!(call instanceof RpcCall)) {
395398
return null;
396399
}
397400
RpcCall rpcCall = (RpcCall)call;
398-
return rpcCall.connection.getEstablishedQOP();
401+
if (rpcCall.connection.isOnAuxiliaryPort()) {
402+
return rpcCall.connection.getEstablishedQOP();
403+
} else {
404+
// Not sending back QOP for primary port
405+
return null;
406+
}
399407
}
400408

401409
/**
@@ -1185,7 +1193,8 @@ private class Listener extends Thread {
11851193
private boolean reuseAddr = conf.getBoolean(
11861194
CommonConfigurationKeysPublic.IPC_SERVER_REUSEADDR_KEY,
11871195
CommonConfigurationKeysPublic.IPC_SERVER_REUSEADDR_DEFAULT);
1188-
1196+
private boolean isOnAuxiliaryPort;
1197+
11891198
Listener(int port) throws IOException {
11901199
address = new InetSocketAddress(bindAddress, port);
11911200
// Create a new server socket and set to non blocking mode
@@ -1213,6 +1222,11 @@ private class Listener extends Thread {
12131222
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
12141223
this.setName("IPC Server listener on " + port);
12151224
this.setDaemon(true);
1225+
this.isOnAuxiliaryPort = false;
1226+
}
1227+
1228+
void setIsAuxiliary() {
1229+
this.isOnAuxiliaryPort = true;
12161230
}
12171231

12181232
private class Reader extends Thread {
@@ -1381,7 +1395,8 @@ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOf
13811395
channel.socket().setKeepAlive(true);
13821396

13831397
Reader reader = getReader();
1384-
Connection c = connectionManager.register(channel, this.listenPort);
1398+
Connection c = connectionManager.register(channel,
1399+
this.listenPort, this.isOnAuxiliaryPort);
13851400
// If the connectionManager can't take it, close the connection.
13861401
if (c == null) {
13871402
if (channel.isOpen()) {
@@ -1805,6 +1820,7 @@ public class Connection {
18051820
private int serviceClass;
18061821
private boolean shouldClose = false;
18071822
private int ingressPort;
1823+
private boolean isOnAuxiliaryPort;
18081824

18091825
UserGroupInformation user = null;
18101826
public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1817,7 +1833,7 @@ public class Connection {
18171833
private boolean useWrap = false;
18181834

18191835
public Connection(SocketChannel channel, long lastContact,
1820-
int ingressPort) {
1836+
int ingressPort, boolean isOnAuxiliaryPort) {
18211837
this.channel = channel;
18221838
this.lastContact = lastContact;
18231839
this.data = null;
@@ -1830,6 +1846,7 @@ public Connection(SocketChannel channel, long lastContact,
18301846
this.socket = channel.socket();
18311847
this.addr = socket.getInetAddress();
18321848
this.ingressPort = ingressPort;
1849+
this.isOnAuxiliaryPort = isOnAuxiliaryPort;
18331850
if (addr == null) {
18341851
this.hostAddress = "*Unknown*";
18351852
} else {
@@ -1875,7 +1892,11 @@ public InetAddress getHostInetAddress() {
18751892
public String getEstablishedQOP() {
18761893
return establishedQOP;
18771894
}
1878-
1895+
1896+
public boolean isOnAuxiliaryPort() {
1897+
return isOnAuxiliaryPort;
1898+
}
1899+
18791900
public void setLastContact(long lastContact) {
18801901
this.lastContact = lastContact;
18811902
}
@@ -3113,6 +3134,8 @@ public synchronized void addAuxiliaryListener(int auxiliaryPort)
31133134
"There is already a listener binding to: " + auxiliaryPort);
31143135
}
31153136
Listener newListener = new Listener(auxiliaryPort);
3137+
newListener.setIsAuxiliary();
3138+
31163139
// in the case of port = 0, the listener would be on a != 0 port.
31173140
LOG.info("Adding a server listener on port " +
31183141
newListener.getAddress().getPort());
@@ -3732,11 +3755,13 @@ Connection[] toArray() {
37323755
return connections.toArray(new Connection[0]);
37333756
}
37343757

3735-
Connection register(SocketChannel channel, int ingressPort) {
3758+
Connection register(SocketChannel channel, int ingressPort,
3759+
boolean isOnAuxiliaryPort) {
37363760
if (isFull()) {
37373761
return null;
37383762
}
3739-
Connection connection = new Connection(channel, Time.now(), ingressPort);
3763+
Connection connection = new Connection(channel, Time.now(),
3764+
ingressPort, isOnAuxiliaryPort);
37403765
add(connection);
37413766
if (LOG.isDebugEnabled()) {
37423767
LOG.debug("Server connection from " + connection +

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public Token<BlockTokenIdentifier> generateToken(String userId,
290290
.getBlockPoolId(), block.getBlockId(), modes, storageTypes,
291291
storageIds, useProto);
292292
if (shouldWrapQOP) {
293-
String qop = Server.getEstablishedQOP();
293+
String qop = Server.getAuxiliaryPortEstablishedQOP();
294294
if (qop != null) {
295295
id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8));
296296
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1900,17 +1900,6 @@ private static String getClientMachine() {
19001900
return clientMachine;
19011901
}
19021902

1903-
/**
1904-
* Return the QOP of the client that the current handler thread
1905-
* is handling. Assuming the negotiation is done at this point,
1906-
* otherwise returns null.
1907-
*
1908-
* @return the established QOP of this client.
1909-
*/
1910-
public static String getEstablishedClientQOP() {
1911-
return Server.getEstablishedQOP();
1912-
}
1913-
19141903
@Override
19151904
public DataEncryptionKey getDataEncryptionKey() throws IOException {
19161905
checkNNStartup();

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
*/
1818
package org.apache.hadoop.hdfs;
1919

20+
import java.net.URI;
2021
import java.util.Arrays;
2122
import java.util.Collection;
2223
import java.util.EnumSet;
2324
import org.apache.commons.logging.Log;
2425
import org.apache.commons.logging.LogFactory;
26+
import org.apache.hadoop.fs.CommonConfigurationKeys;
2527
import org.apache.hadoop.fs.CreateFlag;
2628
import org.apache.hadoop.fs.FSDataOutputStream;
2729
import org.apache.hadoop.fs.FileStatus;
30+
import org.apache.hadoop.fs.FileSystem;
2831
import org.apache.hadoop.fs.Path;
2932
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
3033
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -77,12 +80,33 @@ public TestBlockTokenWrappingQOP(String configKey, String qopValue) {
7780
@Before
7881
public void setup() throws Exception {
7982
conf = createSecureConfig(this.configKey);
83+
conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "12000");
84+
// explicitly setting service rpc for datanode. This because
85+
// DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
86+
// and service port at the same time, and if no setting for service
87+
// rpc, it would return client port, in this case, it will be the
88+
// auxiliary port for data node. Which is not what auxiliary is for.
89+
// setting service rpc port to avoid this.
90+
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
91+
conf.set(
92+
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
93+
"org.apache.hadoop.security.IngressPortBasedResolver");
94+
conf.set("ingress.port.sasl.configured.ports", "12000");
95+
conf.set("ingress.port.sasl.prop.12000", this.configKey);
8096
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
8197
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
8298
conf.set(HADOOP_RPC_PROTECTION, this.configKey);
8399
cluster = null;
84100
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
85101
cluster.waitActive();
102+
103+
HdfsConfiguration clientConf = new HdfsConfiguration(conf);
104+
clientConf.unset(
105+
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
106+
URI currentURI = cluster.getURI();
107+
URI uriAuxiliary = new URI(currentURI.getScheme() +
108+
"://" + currentURI.getHost() + ":12000");
109+
dfs = (DistributedFileSystem) FileSystem.get(uriAuxiliary, conf);
86110
}
87111

88112
@After
@@ -97,7 +121,6 @@ public void testAddBlockWrappingQOP() throws Exception {
97121
final String src = "/testAddBlockWrappingQOP";
98122
final Path path = new Path(src);
99123

100-
dfs = cluster.getFileSystem();
101124
dfs.create(path);
102125

103126
DFSClient client = dfs.getClient();
@@ -114,7 +137,6 @@ public void testAppendWrappingQOP() throws Exception {
114137
final String src = "/testAppendWrappingQOP";
115138
final Path path = new Path(src);
116139

117-
dfs = cluster.getFileSystem();
118140
FSDataOutputStream out = dfs.create(path);
119141
// NameNode append call returns a last block instance. If there is nothing
120142
// it returns as a null. So write something, so that lastBlock has
@@ -138,7 +160,6 @@ public void testGetBlockLocationWrappingQOP() throws Exception {
138160
final String src = "/testGetBlockLocationWrappingQOP";
139161
final Path path = new Path(src);
140162

141-
dfs = cluster.getFileSystem();
142163
FSDataOutputStream out = dfs.create(path);
143164
// if the file is empty, there will be no blocks returned. Write something
144165
// so that getBlockLocations actually returns some block.

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java

Lines changed: 112 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import java.util.ArrayList;
2222
import org.apache.hadoop.fs.BlockLocation;
2323
import org.apache.hadoop.fs.CommonConfigurationKeys;
24+
import org.apache.hadoop.fs.FSDataOutputStream;
2425
import org.apache.hadoop.fs.FileSystem;
2526
import org.apache.hadoop.fs.FileSystemTestHelper;
2627
import org.apache.hadoop.fs.Path;
2728
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
2829
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
2930
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
31+
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
3032
import org.apache.hadoop.hdfs.server.datanode.DataNode;
33+
import org.apache.hadoop.io.DataInputBuffer;
34+
import org.apache.hadoop.security.token.Token;
3135
import org.junit.Before;
3236
import org.junit.Test;
3337

@@ -77,6 +81,85 @@ public void setup() throws Exception {
7781
clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
7882
}
7983

84+
/**
85+
* Test that when NameNode returns back its established QOP,
86+
* it only does this for auxiliary port(s), not the primary port.
87+
*
88+
* @throws Exception
89+
*/
90+
@Test
91+
public void testAuxiliaryPortSendingQOP() throws Exception {
92+
MiniDFSCluster cluster = null;
93+
94+
final String pathPrefix = "/filetestAuxiliaryPortSendingQOP";
95+
try {
96+
cluster = new MiniDFSCluster.Builder(clusterConf)
97+
.numDataNodes(3).build();
98+
99+
cluster.waitActive();
100+
HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
101+
clientConf.unset(
102+
CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
103+
104+
URI currentURI = cluster.getURI();
105+
URI uriAuthPort = new URI(currentURI.getScheme() + "://" +
106+
currentURI.getHost() + ":12000");
107+
URI uriIntegrityPort = new URI(currentURI.getScheme() + "://" +
108+
currentURI.getHost() + ":12100");
109+
URI uriPrivacyPort = new URI(currentURI.getScheme() +
110+
"://" + currentURI.getHost() + ":12200");
111+
112+
// If connecting to primary port, block token should not include
113+
// handshake secret
114+
byte[] secretOnPrimary = getHandshakeSecret(currentURI, clientConf,
115+
new Path(pathPrefix + "Primary"));
116+
assertTrue(secretOnPrimary == null || secretOnPrimary.length == 0);
117+
118+
// If connecting to auxiliary port, block token should include
119+
// handshake secret
120+
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
121+
byte[] secretPrivacy = getHandshakeSecret(uriPrivacyPort, clientConf,
122+
new Path(pathPrefix + "Privacy"));
123+
assertTrue(secretPrivacy.length > 0);
124+
125+
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
126+
byte[] secretIntegrity = getHandshakeSecret(uriIntegrityPort, clientConf,
127+
new Path(pathPrefix + "Integrity"));
128+
assertTrue(secretIntegrity.length > 0);
129+
130+
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
131+
byte[] secretAuthentication = getHandshakeSecret(uriAuthPort,
132+
clientConf, new Path(pathPrefix + "Authentication"));
133+
assertTrue(secretAuthentication.length > 0);
134+
135+
} finally {
136+
if (cluster != null) {
137+
cluster.shutdown();
138+
}
139+
}
140+
}
141+
142+
private byte[] getHandshakeSecret(URI uri, HdfsConfiguration conf,
143+
Path path) throws Exception {
144+
FileSystem fs = FileSystem.get(uri, conf);
145+
FSDataOutputStream out = fs.create(
146+
path, false, 4096, (short)1, BLOCK_SIZE);
147+
try {
148+
out.write(0);
149+
out.hflush();
150+
Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
151+
final byte[] tokenBytes = token.getIdentifier();
152+
DataInputBuffer dib = new DataInputBuffer();
153+
154+
dib.reset(tokenBytes, tokenBytes.length);
155+
BlockTokenIdentifier blockToken = new BlockTokenIdentifier();
156+
blockToken.readFields(dib);
157+
return blockToken.getHandshakeMsg();
158+
} finally {
159+
out.close();
160+
}
161+
}
162+
80163
/**
81164
* Test accessing NameNode from three different ports.
82165
*
@@ -168,33 +251,54 @@ public void testMultipleNNPortOverwriteDownStream() throws Exception {
168251
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
169252
FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
170253
doTest(fsPrivacy, PATH1);
171-
// add a wait so that data has reached not only first DN,
172-
// but also the rest
173-
Thread.sleep(100);
174254
for (int i = 0; i < 2; i++) {
175255
DataNode dn = dataNodes.get(i);
176256
SaslDataTransferClient saslClient = dn.getSaslClient();
177-
assertEquals("auth", saslClient.getTargetQOP());
257+
String qop = null;
258+
// It may take some time for the qop to populate
259+
// to all DNs, check in a loop.
260+
for (int trial = 0; trial < 10; trial++) {
261+
qop = saslClient.getTargetQOP();
262+
if (qop != null) {
263+
break;
264+
}
265+
Thread.sleep(100);
266+
}
267+
assertEquals("auth", qop);
178268
}
179269

180270
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
181271
FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
182272
doTest(fsIntegrity, PATH2);
183-
Thread.sleep(100);
184273
for (int i = 0; i < 2; i++) {
185274
DataNode dn = dataNodes.get(i);
186275
SaslDataTransferClient saslClient = dn.getSaslClient();
187-
assertEquals("auth", saslClient.getTargetQOP());
276+
String qop = null;
277+
for (int trial = 0; trial < 10; trial++) {
278+
qop = saslClient.getTargetQOP();
279+
if (qop != null) {
280+
break;
281+
}
282+
Thread.sleep(100);
283+
}
284+
assertEquals("auth", qop);
188285
}
189286

190287
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
191288
FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
192289
doTest(fsAuth, PATH3);
193-
Thread.sleep(100);
194290
for (int i = 0; i < 3; i++) {
195291
DataNode dn = dataNodes.get(i);
196292
SaslDataTransferServer saslServer = dn.getSaslServer();
197-
assertEquals("auth", saslServer.getNegotiatedQOP());
293+
String qop = null;
294+
for (int trial = 0; trial < 10; trial++) {
295+
qop = saslServer.getNegotiatedQOP();
296+
if (qop != null) {
297+
break;
298+
}
299+
Thread.sleep(100);
300+
}
301+
assertEquals("auth", qop);
198302
}
199303
} finally {
200304
if (cluster != null) {

0 commit comments

Comments
 (0)