@@ -217,25 +217,34 @@ public static <T> ProxyAndInfo<T> createNonHAProxy(
217217 private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol (
218218 InetSocketAddress address , Configuration conf , UserGroupInformation ugi ,
219219 AlignmentContext alignmentContext ) throws IOException {
220+ int timeout = getRPCTimeout (conf ,
221+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_ALIASHMAP_PROTOCOL ,
222+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_ALIASHMAP_PROTOCOL_DEFAULT );
220223 AliasMapProtocolPB proxy = createNameNodeProxy (
221- address , conf , ugi , AliasMapProtocolPB .class , 30000 , alignmentContext );
224+ address , conf , ugi , AliasMapProtocolPB .class , timeout , alignmentContext );
222225 return new InMemoryAliasMapProtocolClientSideTranslatorPB (proxy );
223226 }
224227
225228 private static JournalProtocol createNNProxyWithJournalProtocol (
226229 InetSocketAddress address , Configuration conf , UserGroupInformation ugi ,
227230 AlignmentContext alignmentContext ) throws IOException {
231+ int timeout = getRPCTimeout (conf ,
232+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_JOURNAL_PROTOCOL ,
233+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_JOURNAL_PROTOCOL_DEFAULT );
228234 JournalProtocolPB proxy = createNameNodeProxy (address ,
229- conf , ugi , JournalProtocolPB .class , 30000 , alignmentContext );
235+ conf , ugi , JournalProtocolPB .class , timeout , alignmentContext );
230236 return new JournalProtocolTranslatorPB (proxy );
231237 }
232238
233239 private static RefreshAuthorizationPolicyProtocol
234240 createNNProxyWithRefreshAuthorizationPolicyProtocol (InetSocketAddress address ,
235241 Configuration conf , UserGroupInformation ugi ,
236242 AlignmentContext alignmentContext ) throws IOException {
243+ int timeout = getRPCTimeout (conf ,
244+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_REFRESH_AUTHORIZATION_PROTOCOL ,
245+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_REFRESH_AUTHORIZATION_PROTOCOL_DEFAULT );
237246 RefreshAuthorizationPolicyProtocolPB proxy = createNameNodeProxy (address ,
238- conf , ugi , RefreshAuthorizationPolicyProtocolPB .class , 0 ,
247+ conf , ugi , RefreshAuthorizationPolicyProtocolPB .class , timeout ,
239248 alignmentContext );
240249 return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB (proxy );
241250 }
@@ -244,34 +253,46 @@ private static JournalProtocol createNNProxyWithJournalProtocol(
244253 createNNProxyWithRefreshUserMappingsProtocol (InetSocketAddress address ,
245254 Configuration conf , UserGroupInformation ugi ,
246255 AlignmentContext alignmentContext ) throws IOException {
256+ int timeout = getRPCTimeout (conf ,
257+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_REFRESH_USER_MAPPING_PROTOCOL ,
258+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_REFRESH_USER_MAPPING_PROTOCOL_DEFAULT );
247259 RefreshUserMappingsProtocolPB proxy = createNameNodeProxy (address , conf ,
248- ugi , RefreshUserMappingsProtocolPB .class , 0 , alignmentContext );
260+ ugi , RefreshUserMappingsProtocolPB .class , timeout , alignmentContext );
249261 return new RefreshUserMappingsProtocolClientSideTranslatorPB (proxy );
250262 }
251263
252264 private static RefreshCallQueueProtocol
253265 createNNProxyWithRefreshCallQueueProtocol (InetSocketAddress address ,
254266 Configuration conf , UserGroupInformation ugi ,
255267 AlignmentContext alignmentContext ) throws IOException {
268+ int timeout = getRPCTimeout (conf ,
269+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_REFRESH_CALL_QUEUE_PROTOCOL ,
270+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_REFRESH_CALL_QUEUE_PROTOCOL_DEFAULT );
256271 RefreshCallQueueProtocolPB proxy = createNameNodeProxy (address , conf , ugi ,
257- RefreshCallQueueProtocolPB .class , 0 , alignmentContext );
272+ RefreshCallQueueProtocolPB .class , timeout , alignmentContext );
258273 return new RefreshCallQueueProtocolClientSideTranslatorPB (proxy );
259274 }
260275
261276 private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol (
262277 InetSocketAddress address , Configuration conf , UserGroupInformation ugi ,
263278 AlignmentContext alignmentContext ) throws IOException {
279+ int timeout = getRPCTimeout (conf ,
280+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_GET_USER_MAPPING_PROTOCOL ,
281+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_GET_USER_MAPPING_PROTOCOL_DEFAULT );
264282 GetUserMappingsProtocolPB proxy = createNameNodeProxy (address , conf , ugi ,
265- GetUserMappingsProtocolPB .class , 0 , alignmentContext );
283+ GetUserMappingsProtocolPB .class , timeout , alignmentContext );
266284 return new GetUserMappingsProtocolClientSideTranslatorPB (proxy );
267285 }
268286
269287 private static NamenodeProtocol createNNProxyWithNamenodeProtocol (
270288 InetSocketAddress address , Configuration conf , UserGroupInformation ugi ,
271289 boolean withRetries , AlignmentContext alignmentContext )
272290 throws IOException {
273- NamenodeProtocolPB proxy = createNameNodeProxy (
274- address , conf , ugi , NamenodeProtocolPB .class , 0 , alignmentContext );
291+ int timeout = getRPCTimeout (conf ,
292+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_NAMENODE_PROTOCOL ,
293+ DFSConfigKeys .IPC_RPC_TIMEOUT_FOR_NAMENODE_PROTOCOL_DEFAULT );
294+ NamenodeProtocolPB proxy = createNameNodeProxy (address , conf , ugi ,
295+ NamenodeProtocolPB .class , timeout , alignmentContext );
275296 if (withRetries ) { // create the proxy with retries
276297 RetryPolicy timeoutPolicy = RetryPolicies .exponentialBackoffRetry (5 , 200 ,
277298 TimeUnit .MILLISECONDS );
@@ -312,4 +333,21 @@ private static <T> T createNameNodeProxy(InetSocketAddress address,
312333 alignmentContext ).getProxy ();
313334 }
314335
336+ /**
337+ * Try to obtain the timeout for confKey from Conf.
338+ * If the value is invalid, just print some warn log and return the default value.
339+ * @param conf input Configuration.
340+ * @param confKey input conf key.
341+ * @param defaultValue input default conf value.
342+ * @return a non negative number.
343+ */
344+ private static int getRPCTimeout (Configuration conf , String confKey , long defaultValue ) {
345+ long tmpTimeout = conf .getTimeDuration (confKey , defaultValue , TimeUnit .MILLISECONDS );
346+ if (tmpTimeout < 0 ) {
347+ LOG .warn ("Invalid value {} configured for {} should be greater than or equal to 0. " +
348+ "Using default value of : {}ms instead." , tmpTimeout , conf , defaultValue );
349+ tmpTimeout = defaultValue ;
350+ }
351+ return (int ) tmpTimeout ;
352+ }
315353}
0 commit comments