36
36
import java .util .HashSet ;
37
37
import java .util .List ;
38
38
import java .util .Map ;
39
+ import java .util .Set ;
39
40
import java .util .concurrent .ExecutionException ;
40
41
41
42
import javax .servlet .ServletContext ;
@@ -278,6 +279,9 @@ protected void queueExternalCall(ExternalCall call)
278
279
namenode .queueExternalCall (call );
279
280
}
280
281
282
+ /**
283
+ * Chooses a Datanode to redirect a request to.
284
+ */
281
285
@ VisibleForTesting
282
286
static DatanodeInfo chooseDatanode (final NameNode namenode ,
283
287
final String path , final HttpOpParam .Op op , final long openOffset ,
@@ -288,18 +292,18 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
288
292
throw new IOException ("Namesystem has not been initialized yet." );
289
293
}
290
294
final BlockManager bm = fsn .getBlockManager ();
291
-
292
- HashSet <Node > excludes = new HashSet <Node >();
295
+
296
+ Set <Node > excludes = new HashSet <>();
293
297
if (excludeDatanodes != null ) {
294
298
for (String host : StringUtils
295
299
.getTrimmedStringCollection (excludeDatanodes )) {
296
- int idx = host .indexOf (":" );
300
+ int idx = host .indexOf (':' );
297
301
Node excludeNode = null ;
298
- if (idx != -1 ) {
299
- excludeNode = bm .getDatanodeManager ().getDatanodeByXferAddr (
300
- host .substring (0 , idx ), Integer .parseInt (host .substring (idx + 1 )));
301
- } else {
302
+ if (idx == -1 ) {
302
303
excludeNode = bm .getDatanodeManager ().getDatanodeByHost (host );
304
+ } else {
305
+ excludeNode = bm .getDatanodeManager ().getDatanodeByXferAddr (
306
+ host .substring (0 , idx ), Integer .parseInt (host .substring (idx + 1 )));
303
307
}
304
308
305
309
if (excludeNode != null ) {
@@ -311,25 +315,15 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
311
315
}
312
316
}
313
317
314
- if (op == PutOpParam .Op .CREATE ) {
315
- //choose a datanode near to client
316
- final DatanodeDescriptor clientNode = bm .getDatanodeManager (
317
- ).getDatanodeByHost (remoteAddr );
318
- if (clientNode != null ) {
319
- final DatanodeStorageInfo [] storages = bm .chooseTarget4WebHDFS (
320
- path , clientNode , excludes , blocksize );
321
- if (storages .length > 0 ) {
322
- return storages [0 ].getDatanodeDescriptor ();
323
- }
324
- }
325
- } else if (op == GetOpParam .Op .OPEN
318
+ // For these operations choose a datanode containing a replica
319
+ if (op == GetOpParam .Op .OPEN
326
320
|| op == GetOpParam .Op .GETFILECHECKSUM
327
321
|| op == PostOpParam .Op .APPEND ) {
328
- //choose a datanode containing a replica
329
322
final NamenodeProtocols np = getRPCServer (namenode );
330
323
if (status == null ) {
331
324
throw new FileNotFoundException ("File " + path + " not found." );
332
325
}
326
+
333
327
final long len = status .getLen ();
334
328
if (op == GetOpParam .Op .OPEN ) {
335
329
if (openOffset < 0L || (openOffset >= len && len > 0 )) {
@@ -344,10 +338,22 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
344
338
final int count = locations .locatedBlockCount ();
345
339
if (count > 0 ) {
346
340
return bestNode (locations .get (0 ).getLocations (), excludes );
341
+ } else {
342
+ throw new IOException ("Block could not be located. Path=" + path + ", offset=" + offset );
347
343
}
348
344
}
349
345
}
350
346
347
+ // All other operations don't affect a specific node so let the BlockManager pick a target
348
+ DatanodeDescriptor clientNode = bm .getDatanodeManager (
349
+ ).getDatanodeByHost (remoteAddr );
350
+
351
+ DatanodeStorageInfo [] storages = bm .chooseTarget4WebHDFS (
352
+ path , clientNode , excludes , blocksize );
353
+ if (storages .length > 0 ) {
354
+ return storages [0 ].getDatanodeDescriptor ();
355
+ }
356
+
351
357
return (DatanodeDescriptor )bm .getDatanodeManager ().getNetworkTopology (
352
358
).chooseRandom (NodeBase .ROOT , excludes );
353
359
}
@@ -358,13 +364,13 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
358
364
* to return the first element of the node here.
359
365
*/
360
366
protected static DatanodeInfo bestNode (DatanodeInfo [] nodes ,
361
- HashSet <Node > excludes ) throws IOException {
367
+ Set <Node > excludes ) throws IOException {
362
368
for (DatanodeInfo dn : nodes ) {
363
- if (false == dn .isDecommissioned () && false == excludes .contains (dn )) {
369
+ if (! dn .isDecommissioned () && ! excludes .contains (dn )) {
364
370
return dn ;
365
371
}
366
372
}
367
- throw new IOException ("No active nodes contain this block" );
373
+ throw new IOException ("No active and not excluded nodes contain this block" );
368
374
}
369
375
370
376
public long renewDelegationToken (Token <DelegationTokenIdentifier > token )
0 commit comments