|
24 | 24 | import java.io.FileInputStream; |
25 | 25 | import java.io.FileNotFoundException; |
26 | 26 | import java.io.IOException; |
| 27 | +import java.net.InetSocketAddress; |
27 | 28 | import java.net.URI; |
28 | 29 | import java.net.URISyntaxException; |
29 | 30 | import java.nio.ByteBuffer; |
30 | 31 | import java.nio.channels.FileChannel; |
31 | 32 | import java.nio.file.Files; |
| 33 | +import java.util.ArrayList; |
32 | 34 | import java.util.LinkedList; |
33 | 35 | import java.util.List; |
| 36 | +import java.util.concurrent.CompletionService; |
| 37 | +import java.util.concurrent.ExecutorCompletionService; |
| 38 | +import java.util.concurrent.Future; |
| 39 | +import java.util.concurrent.LinkedBlockingQueue; |
34 | 40 | import java.util.concurrent.TimeUnit; |
35 | 41 |
|
| 42 | +import org.apache.hadoop.fs.FileStatus; |
| 43 | +import org.apache.hadoop.hdfs.BlockReader; |
| 44 | +import org.apache.hadoop.hdfs.DFSClient; |
| 45 | +import org.apache.hadoop.hdfs.DFSConfigKeys; |
| 46 | +import org.apache.hadoop.hdfs.client.impl.BlockReaderRemote; |
| 47 | +import org.apache.hadoop.hdfs.net.Peer; |
| 48 | +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| 49 | +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; |
| 50 | +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| 51 | +import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| 52 | +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| 53 | +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; |
| 54 | +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; |
| 55 | +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; |
| 56 | +import org.apache.hadoop.hdfs.util.StripedBlockUtil; |
| 57 | +import org.apache.hadoop.io.erasurecode.CodecUtil; |
| 58 | +import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; |
| 59 | +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; |
| 60 | +import org.apache.hadoop.net.NetUtils; |
| 61 | +import org.apache.hadoop.security.token.Token; |
36 | 62 | import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles; |
37 | 63 | import org.apache.hadoop.HadoopIllegalArgumentException; |
38 | 64 | import org.apache.hadoop.classification.InterfaceAudience; |
@@ -69,6 +95,7 @@ public class DebugAdmin extends Configured implements Tool { |
69 | 95 | new VerifyMetaCommand(), |
70 | 96 | new ComputeMetaCommand(), |
71 | 97 | new RecoverLeaseCommand(), |
| 98 | + new VerifyECCommand(), |
72 | 99 | new HelpCommand() |
73 | 100 | }; |
74 | 101 |
|
@@ -387,6 +414,209 @@ int run(List<String> args) throws IOException { |
387 | 414 | } |
388 | 415 | } |
389 | 416 |
|
| 417 | + /** |
| 418 | + * The command for verifying the correctness of erasure coding on an erasure coded file. |
| 419 | + */ |
| 420 | + private class VerifyECCommand extends DebugCommand { |
| 421 | + private DFSClient client; |
| 422 | + private int dataBlkNum; |
| 423 | + private int parityBlkNum; |
| 424 | + private int cellSize; |
| 425 | + private boolean useDNHostname; |
| 426 | + private CachingStrategy cachingStrategy; |
| 427 | + private int stripedReadBufferSize; |
| 428 | + private CompletionService<Integer> readService; |
| 429 | + private RawErasureEncoder encoder; |
| 430 | + private BlockReader[] blockReaders; |
| 431 | + |
| 432 | + |
| 433 | + VerifyECCommand() { |
| 434 | + super("verifyEC", |
| 435 | + "verifyEC -file <file>", |
| 436 | + " Verify HDFS erasure coding on all block groups of the file."); |
| 437 | + } |
| 438 | + |
| 439 | + int run(List<String> args) throws IOException { |
| 440 | + if (args.size() < 2) { |
| 441 | + System.out.println(usageText); |
| 442 | + System.out.println(helpText + System.lineSeparator()); |
| 443 | + return 1; |
| 444 | + } |
| 445 | + String file = StringUtils.popOptionWithArgument("-file", args); |
| 446 | + Path path = new Path(file); |
| 447 | + DistributedFileSystem dfs = AdminHelper.getDFS(getConf()); |
| 448 | + this.client = dfs.getClient(); |
| 449 | + |
| 450 | + FileStatus fileStatus; |
| 451 | + try { |
| 452 | + fileStatus = dfs.getFileStatus(path); |
| 453 | + } catch (FileNotFoundException e) { |
| 454 | + System.err.println("File " + file + " does not exist."); |
| 455 | + return 1; |
| 456 | + } |
| 457 | + |
| 458 | + if (!fileStatus.isFile()) { |
| 459 | + System.err.println("File " + file + " is not a regular file."); |
| 460 | + return 1; |
| 461 | + } |
| 462 | + if (!dfs.isFileClosed(path)) { |
| 463 | + System.err.println("File " + file + " is not closed."); |
| 464 | + return 1; |
| 465 | + } |
| 466 | + this.useDNHostname = getConf().getBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, |
| 467 | + DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); |
| 468 | + this.cachingStrategy = CachingStrategy.newDefaultStrategy(); |
| 469 | + this.stripedReadBufferSize = getConf().getInt( |
| 470 | + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, |
| 471 | + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT); |
| 472 | + |
| 473 | + LocatedBlocks locatedBlocks = client.getLocatedBlocks(file, 0, fileStatus.getLen()); |
| 474 | + if (locatedBlocks.getErasureCodingPolicy() == null) { |
| 475 | + System.err.println("File " + file + " is not erasure coded."); |
| 476 | + return 1; |
| 477 | + } |
| 478 | + ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy(); |
| 479 | + this.dataBlkNum = ecPolicy.getNumDataUnits(); |
| 480 | + this.parityBlkNum = ecPolicy.getNumParityUnits(); |
| 481 | + this.cellSize = ecPolicy.getCellSize(); |
| 482 | + this.encoder = CodecUtil.createRawEncoder(getConf(), ecPolicy.getCodecName(), |
| 483 | + new ErasureCoderOptions( |
| 484 | + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits())); |
| 485 | + int blockNum = dataBlkNum + parityBlkNum; |
| 486 | + this.readService = new ExecutorCompletionService<>( |
| 487 | + DFSUtilClient.getThreadPoolExecutor(blockNum, blockNum, 60, |
| 488 | + new LinkedBlockingQueue<>(), "read-", false)); |
| 489 | + this.blockReaders = new BlockReader[dataBlkNum + parityBlkNum]; |
| 490 | + |
| 491 | + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { |
| 492 | + System.out.println("Checking EC block group: blk_" + locatedBlock.getBlock().getBlockId()); |
| 493 | + LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock; |
| 494 | + |
| 495 | + try { |
| 496 | + verifyBlockGroup(blockGroup); |
| 497 | + System.out.println("Status: OK"); |
| 498 | + } catch (Exception e) { |
| 499 | + System.err.println("Status: ERROR, message: " + e.getMessage()); |
| 500 | + return 1; |
| 501 | + } finally { |
| 502 | + closeBlockReaders(); |
| 503 | + } |
| 504 | + } |
| 505 | + System.out.println("\nAll EC block group status: OK"); |
| 506 | + return 0; |
| 507 | + } |
| 508 | + |
| 509 | + private void verifyBlockGroup(LocatedStripedBlock blockGroup) throws Exception { |
| 510 | + final LocatedBlock[] indexedBlocks = StripedBlockUtil.parseStripedBlockGroup(blockGroup, |
| 511 | + cellSize, dataBlkNum, parityBlkNum); |
| 512 | + |
| 513 | + int blockNumExpected = Math.min(dataBlkNum, |
| 514 | + (int) ((blockGroup.getBlockSize() - 1) / cellSize + 1)) + parityBlkNum; |
| 515 | + if (blockGroup.getBlockIndices().length < blockNumExpected) { |
| 516 | + throw new Exception("Block group is under-erasure-coded."); |
| 517 | + } |
| 518 | + |
| 519 | + long maxBlockLen = 0L; |
| 520 | + DataChecksum checksum = null; |
| 521 | + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { |
| 522 | + LocatedBlock block = indexedBlocks[i]; |
| 523 | + if (block == null) { |
| 524 | + blockReaders[i] = null; |
| 525 | + continue; |
| 526 | + } |
| 527 | + if (block.getBlockSize() > maxBlockLen) { |
| 528 | + maxBlockLen = block.getBlockSize(); |
| 529 | + } |
| 530 | + BlockReader blockReader = createBlockReader(block.getBlock(), |
| 531 | + block.getLocations()[0], block.getBlockToken()); |
| 532 | + if (checksum == null) { |
| 533 | + checksum = blockReader.getDataChecksum(); |
| 534 | + } else { |
| 535 | + assert checksum.equals(blockReader.getDataChecksum()); |
| 536 | + } |
| 537 | + blockReaders[i] = blockReader; |
| 538 | + } |
| 539 | + assert checksum != null; |
| 540 | + int bytesPerChecksum = checksum.getBytesPerChecksum(); |
| 541 | + int bufferSize = stripedReadBufferSize < bytesPerChecksum ? bytesPerChecksum : |
| 542 | + stripedReadBufferSize - stripedReadBufferSize % bytesPerChecksum; |
| 543 | + final ByteBuffer[] buffers = new ByteBuffer[dataBlkNum + parityBlkNum]; |
| 544 | + final ByteBuffer[] outputs = new ByteBuffer[parityBlkNum]; |
| 545 | + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { |
| 546 | + buffers[i] = ByteBuffer.allocate(bufferSize); |
| 547 | + } |
| 548 | + for (int i = 0; i < parityBlkNum; i++) { |
| 549 | + outputs[i] = ByteBuffer.allocate(bufferSize); |
| 550 | + } |
| 551 | + long positionInBlock = 0L; |
| 552 | + while (positionInBlock < maxBlockLen) { |
| 553 | + final int toVerifyLen = (int) Math.min(bufferSize, maxBlockLen - positionInBlock); |
| 554 | + List<Future<Integer>> futures = new ArrayList<>(dataBlkNum + parityBlkNum); |
| 555 | + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { |
| 556 | + final int fi = i; |
| 557 | + futures.add(this.readService.submit(() -> { |
| 558 | + BlockReader blockReader = blockReaders[fi]; |
| 559 | + ByteBuffer buffer = buffers[fi]; |
| 560 | + buffer.clear(); |
| 561 | + buffer.limit(toVerifyLen); |
| 562 | + int readLen = 0; |
| 563 | + if (blockReader != null) { |
| 564 | + int toRead = buffer.remaining(); |
| 565 | + while (readLen < toRead) { |
| 566 | + int nread = blockReader.read(buffer); |
| 567 | + if (nread <= 0) { |
| 568 | + break; |
| 569 | + } |
| 570 | + readLen += nread; |
| 571 | + } |
| 572 | + } |
| 573 | + while (buffer.hasRemaining()) { |
| 574 | + buffer.put((byte) 0); |
| 575 | + } |
| 576 | + buffer.flip(); |
| 577 | + return readLen; |
| 578 | + })); |
| 579 | + } |
| 580 | + for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { |
| 581 | + futures.get(i).get(1, TimeUnit.MINUTES); |
| 582 | + } |
| 583 | + ByteBuffer[] inputs = new ByteBuffer[dataBlkNum]; |
| 584 | + System.arraycopy(buffers, 0, inputs, 0, dataBlkNum); |
| 585 | + for (int i = 0; i < parityBlkNum; i++) { |
| 586 | + outputs[i].clear(); |
| 587 | + outputs[i].limit(toVerifyLen); |
| 588 | + } |
| 589 | + this.encoder.encode(inputs, outputs); |
| 590 | + for (int i = 0; i < parityBlkNum; i++) { |
| 591 | + if (!buffers[dataBlkNum + i].equals(outputs[i])) { |
| 592 | + throw new Exception("EC compute result not match."); |
| 593 | + } |
| 594 | + } |
| 595 | + positionInBlock += toVerifyLen; |
| 596 | + } |
| 597 | + } |
| 598 | + |
| 599 | + private BlockReader createBlockReader(ExtendedBlock block, DatanodeInfo dnInfo, |
| 600 | + Token<BlockTokenIdentifier> token) throws IOException { |
| 601 | + InetSocketAddress dnAddress = NetUtils.createSocketAddr(dnInfo.getXferAddr(useDNHostname)); |
| 602 | + Peer peer = client.newConnectedPeer(dnAddress, token, dnInfo); |
| 603 | + return BlockReaderRemote.newBlockReader( |
| 604 | + "dummy", block, token, 0, |
| 605 | + block.getNumBytes(), true, "", peer, dnInfo, |
| 606 | + null, cachingStrategy, -1, getConf()); |
| 607 | + } |
| 608 | + |
| 609 | + private void closeBlockReaders() { |
| 610 | + for (int i = 0; i < blockReaders.length; i++) { |
| 611 | + if (blockReaders[i] != null) { |
| 612 | + IOUtils.closeStream(blockReaders[i]); |
| 613 | + blockReaders[i] = null; |
| 614 | + } |
| 615 | + } |
| 616 | + } |
| 617 | + |
| 618 | + } |
| 619 | + |
390 | 620 | /** |
391 | 621 | * The command for getting help about other commands. |
392 | 622 | */ |
|
0 commit comments