Skip to content

Commit 16c36d5

Browse files
author
zhanghaobo
committed
HDFS-16452.msync-RPC-should-send-to-acitve-namenode
1 parent 900682e commit 16c36d5

File tree

6 files changed

+78
-13
lines changed

6 files changed

+78
-13
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
104104
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
105105
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
106+
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX;
106107
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
107108
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
108109
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
@@ -227,6 +228,20 @@ public static Collection<String> getNameNodeIds(Configuration conf, String nsId)
227228
return conf.getTrimmedStringCollection(key);
228229
}
229230

231+
/**
232+
* Returns collections of observer namenode Ids from the configuration. One logical id
233+
* for each namenode in the in the Observer-Read setup.
234+
*
235+
* @param conf configuration
236+
* @param nsId the nameservice ID to look at, or null for non-federated
237+
* @return collection of observer namenode Ids
238+
*/
239+
public static Collection<String> getObserberNameNodeIds(Configuration conf, String nsId) {
240+
String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
241+
String observerKey = addSuffix(key, DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX);
242+
return conf.getTrimmedStringCollection(observerKey);
243+
}
244+
230245
/** Add non empty and non null suffix to a key */
231246
static String addSuffix(String key, String suffix) {
232247
if (suffix == null || suffix.isEmpty()) {
@@ -422,7 +437,21 @@ static String concatSuffixes(String... suffixes) {
422437
public static Map<String, Map<String, InetSocketAddress>> getAddresses(
423438
Configuration conf, String defaultAddress, String... keys) {
424439
Collection<String> nameserviceIds = getNameServiceIds(conf);
425-
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
440+
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, false, keys);
441+
}
442+
443+
/**
444+
* Returns the configured address for all NameNodes in the cluster.
445+
* @param conf configuration
446+
* @param defaultAddress default address to return in case key is not found.
447+
* @param forOnnFailover2Ann whether for ObserverReadProxyProvider failoverProxy
448+
* @param keys Set of keys to look for in the order of preference
449+
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
450+
*/
451+
public static Map<String, Map<String, InetSocketAddress>> getAddresses(
452+
Configuration conf, String defaultAddress, boolean forOnnFailover2Ann, String... keys) {
453+
Collection<String> nameserviceIds = getNameServiceIds(conf);
454+
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, forOnnFailover2Ann, keys);
426455
}
427456

428457
/**
@@ -507,13 +536,13 @@ private static String getConcatNnId(String nsId, String nnId, String hostname, i
507536
*/
508537
static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(
509538
Configuration conf, Collection<String> nsIds, String defaultAddress,
510-
String... keys) {
539+
boolean forOnnFailover2Ann, String... keys) {
511540
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
512541
// across all of the configured nameservices and namenodes.
513542
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
514543
for (String nsId : emptyAsSingletonNull(nsIds)) {
515544
Map<String, InetSocketAddress> isas =
516-
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
545+
getAddressesForNameserviceId(conf, nsId, defaultAddress, forOnnFailover2Ann, keys);
517546
if (!isas.isEmpty()) {
518547
ret.put(nsId, isas);
519548
}
@@ -523,7 +552,25 @@ static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(
523552

524553
public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
525554
Configuration conf, String nsId, String defaultValue, String... keys) {
555+
return getAddressesForNameserviceId(conf, nsId, defaultValue, false, keys);
556+
}
557+
558+
public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
559+
Configuration conf, String nsId, String defaultValue,
560+
boolean forOnnFailover2Ann, String... keys) {
526561
Collection<String> nnIds = getNameNodeIds(conf, nsId);
562+
if (forOnnFailover2Ann) {
563+
Collection<String> obnnIds = getObserberNameNodeIds(conf, nsId);
564+
if (obnnIds.size() != 0) {
565+
nnIds.removeIf(obnnIds::contains);
566+
}
567+
}
568+
return getNamenodeIDInetSocketAddressMap(conf, nsId, defaultValue, nnIds, keys);
569+
}
570+
571+
private static Map<String, InetSocketAddress> getNamenodeIDInetSocketAddressMap(
572+
Configuration conf, String nsId, String defaultValue,
573+
Collection<String> nnIds, String[] keys) {
527574
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
528575
for (String nnId : emptyAsSingletonNull(nnIds)) {
529576
String suffix = concatSuffixes(nsId, nnId);
@@ -532,15 +579,16 @@ public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
532579
InetSocketAddress isa = NetUtils.createSocketAddr(address);
533580
if (isa.isUnresolved()) {
534581
LOG.warn("Namenode for {} remains unresolved for ID {}. Check your "
535-
+ "hdfs-site.xml file to ensure namenodes are configured "
536-
+ "properly.", nsId, nnId);
582+
+ "hdfs-site.xml file to ensure namenodes are configured "
583+
+ "properly.", nsId, nnId);
537584
}
538585
ret.put(nnId, isa);
539586
}
540587
}
541588
return ret;
542589
}
543590

591+
544592
/**
545593
* Return address from configuration. Take a list of keys as preference.
546594
* If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY,

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public interface HdfsClientConfigKeys {
7979
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
8080
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
8181
int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020;
82+
String DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX = "observers";
8283
String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
8384
"dfs.namenode.kerberos.principal";
8485
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,15 @@ protected NNProxyInfo<T> createProxyIfNeeded(NNProxyInfo<T> pi) {
167167
* Get list of configured NameNode proxy addresses.
168168
* Randomize the list if requested.
169169
*/
170-
protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey) {
170+
protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey,
171+
boolean forOnnFailover2Ann) {
171172
final List<NNProxyInfo<T>> proxies = new ArrayList<NNProxyInfo<T>>();
172-
Map<String, Map<String, InetSocketAddress>> map =
173-
DFSUtilClient.getAddresses(conf, null, addressKey);
173+
Map<String, Map<String, InetSocketAddress>> map;
174+
if (forOnnFailover2Ann) {
175+
map = DFSUtilClient.getAddresses(conf, null, true, addressKey);
176+
} else {
177+
map = DFSUtilClient.getAddresses(conf, null, addressKey);
178+
}
174179
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
175180

176181
if (addressesInNN == null || addressesInNN.size() == 0) {

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,18 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
4848
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
4949
Class<T> xface, HAProxyFactory<T> factory, String addressKey) {
5050
super(conf, uri, xface, factory);
51-
this.proxies = getProxyAddresses(uri, addressKey);
51+
this.proxies = getProxyAddresses(uri, addressKey, false);
52+
}
53+
54+
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
55+
Class<T> xface, HAProxyFactory<T> factory, boolean forOnnFailover2Ann) {
56+
this(conf, uri, xface, factory, DFS_NAMENODE_RPC_ADDRESS_KEY, forOnnFailover2Ann);
57+
}
58+
59+
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
60+
Class<T> xface, HAProxyFactory<T> factory, String addressKey, boolean forOnnFailover2Ann) {
61+
super(conf, uri, xface, factory);
62+
this.proxies = getProxyAddresses(uri, addressKey, forOnnFailover2Ann);
5263
}
5364

5465
/**

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public class ObserverReadProxyProvider<T>
168168
public ObserverReadProxyProvider(
169169
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
170170
this(conf, uri, xface, factory,
171-
new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory));
171+
new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory, true));
172172
}
173173

174174
@SuppressWarnings("unchecked")
@@ -189,7 +189,7 @@ public ObserverReadProxyProvider(
189189

190190
// Get all NameNode proxies
191191
nameNodeProxies = getProxyAddresses(uri,
192-
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
192+
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, false);
193193

194194
// Create a wrapped proxy containing all the proxies. Since this combined
195195
// proxy is just redirecting to other proxies, all invocations can share it.

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ public ClientProtocol createProxy(Configuration config,
117117
}) {
118118
@Override
119119
protected List<NNProxyInfo<ClientProtocol>> getProxyAddresses(
120-
URI uri, String addressKey) {
120+
URI uri, String addressKey, boolean forOnnFailover2Ann) {
121121
List<NNProxyInfo<ClientProtocol>> nnProxies =
122-
super.getProxyAddresses(uri, addressKey);
122+
super.getProxyAddresses(uri, addressKey, forOnnFailover2Ann);
123123
return nnProxies;
124124
}
125125
};

0 commit comments

Comments
 (0)