|
18 | 18 | package org.apache.hadoop.hdfs.server.datanode; |
19 | 19 |
|
20 | 20 |
|
| 21 | +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; |
| 22 | +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; |
21 | 23 | import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; |
22 | 24 | import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; |
23 | 25 | import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING; |
@@ -299,7 +301,8 @@ public class DataNode extends ReconfigurableBase |
299 | 301 | Collections.unmodifiableList( |
300 | 302 | Arrays.asList( |
301 | 303 | DFS_DATANODE_DATA_DIR_KEY, |
302 | | - DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)); |
| 304 | + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, |
| 305 | + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); |
303 | 306 |
|
304 | 307 | public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); |
305 | 308 |
|
@@ -533,78 +536,118 @@ protected Configuration getNewConf() { |
533 | 536 | public String reconfigurePropertyImpl(String property, String newVal) |
534 | 537 | throws ReconfigurationException { |
535 | 538 | switch (property) { |
536 | | - case DFS_DATANODE_DATA_DIR_KEY: { |
537 | | - IOException rootException = null; |
| 539 | + case DFS_DATANODE_DATA_DIR_KEY: { |
| 540 | + IOException rootException = null; |
| 541 | + try { |
| 542 | + LOG.info("Reconfiguring {} to {}", property, newVal); |
| 543 | + this.refreshVolumes(newVal); |
| 544 | + return getConf().get(DFS_DATANODE_DATA_DIR_KEY); |
| 545 | + } catch (IOException e) { |
| 546 | + rootException = e; |
| 547 | + } finally { |
| 548 | + // Send a full block report to let NN acknowledge the volume changes. |
538 | 549 | try { |
539 | | - LOG.info("Reconfiguring {} to {}", property, newVal); |
540 | | - this.refreshVolumes(newVal); |
541 | | - return getConf().get(DFS_DATANODE_DATA_DIR_KEY); |
| 550 | + triggerBlockReport( |
| 551 | + new BlockReportOptions.Factory().setIncremental(false).build()); |
542 | 552 | } catch (IOException e) { |
543 | | - rootException = e; |
| 553 | + LOG.warn("Exception while sending the block report after refreshing" |
| 554 | + + " volumes {} to {}", property, newVal, e); |
| 555 | + if (rootException == null) { |
| 556 | + rootException = e; |
| 557 | + } |
544 | 558 | } finally { |
545 | | - // Send a full block report to let NN acknowledge the volume changes. |
546 | | - try { |
547 | | - triggerBlockReport( |
548 | | - new BlockReportOptions.Factory().setIncremental(false).build()); |
549 | | - } catch (IOException e) { |
550 | | - LOG.warn("Exception while sending the block report after refreshing" |
551 | | - + " volumes {} to {}", property, newVal, e); |
552 | | - if (rootException == null) { |
553 | | - rootException = e; |
554 | | - } |
555 | | - } finally { |
556 | | - if (rootException != null) { |
557 | | - throw new ReconfigurationException(property, newVal, |
558 | | - getConf().get(property), rootException); |
559 | | - } |
| 559 | + if (rootException != null) { |
| 560 | + throw new ReconfigurationException(property, newVal, |
| 561 | + getConf().get(property), rootException); |
560 | 562 | } |
561 | 563 | } |
562 | | - break; |
563 | 564 | } |
564 | | - case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: { |
565 | | - ReconfigurationException rootException = null; |
566 | | - try { |
567 | | - LOG.info("Reconfiguring {} to {}", property, newVal); |
568 | | - int movers; |
569 | | - if (newVal == null) { |
570 | | - // set to default |
571 | | - movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; |
572 | | - } else { |
573 | | - movers = Integer.parseInt(newVal); |
574 | | - if (movers <= 0) { |
575 | | - rootException = new ReconfigurationException( |
576 | | - property, |
577 | | - newVal, |
578 | | - getConf().get(property), |
579 | | - new IllegalArgumentException( |
580 | | - "balancer max concurrent movers must be larger than 0")); |
581 | | - } |
582 | | - } |
583 | | - boolean success = xserver.updateBalancerMaxConcurrentMovers(movers); |
584 | | - if (!success) { |
| 565 | + break; |
| 566 | + } |
| 567 | + case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: { |
| 568 | + ReconfigurationException rootException = null; |
| 569 | + try { |
| 570 | + LOG.info("Reconfiguring {} to {}", property, newVal); |
| 571 | + int movers; |
| 572 | + if (newVal == null) { |
| 573 | + // set to default |
| 574 | + movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; |
| 575 | + } else { |
| 576 | + movers = Integer.parseInt(newVal); |
| 577 | + if (movers <= 0) { |
585 | 578 | rootException = new ReconfigurationException( |
586 | 579 | property, |
587 | 580 | newVal, |
588 | 581 | getConf().get(property), |
589 | 582 | new IllegalArgumentException( |
590 | | - "Could not modify concurrent moves thread count")); |
| 583 | + "balancer max concurrent movers must be larger than 0")); |
591 | 584 | } |
592 | | - return Integer.toString(movers); |
593 | | - } catch (NumberFormatException nfe) { |
| 585 | + } |
| 586 | + boolean success = xserver.updateBalancerMaxConcurrentMovers(movers); |
| 587 | + if (!success) { |
594 | 588 | rootException = new ReconfigurationException( |
595 | | - property, newVal, getConf().get(property), nfe); |
596 | | - } finally { |
597 | | - if (rootException != null) { |
598 | | - LOG.warn(String.format( |
599 | | - "Exception in updating balancer max concurrent movers %s to %s", |
600 | | - property, newVal), rootException); |
601 | | - throw rootException; |
| 589 | + property, |
| 590 | + newVal, |
| 591 | + getConf().get(property), |
| 592 | + new IllegalArgumentException( |
| 593 | + "Could not modify concurrent moves thread count")); |
| 594 | + } |
| 595 | + return Integer.toString(movers); |
| 596 | + } catch (NumberFormatException nfe) { |
| 597 | + rootException = new ReconfigurationException( |
| 598 | + property, newVal, getConf().get(property), nfe); |
| 599 | + } finally { |
| 600 | + if (rootException != null) { |
| 601 | + LOG.warn(String.format( |
| 602 | + "Exception in updating balancer max concurrent movers %s to %s", |
| 603 | + property, newVal), rootException); |
| 604 | + throw rootException; |
| 605 | + } |
| 606 | + } |
| 607 | + break; |
| 608 | + } |
| 609 | + case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: { |
| 610 | + ReconfigurationException rootException = null; |
| 611 | + try { |
| 612 | + LOG.info("Reconfiguring {} to {}", property, newVal); |
| 613 | + long intervalMs; |
| 614 | + if (newVal == null) { |
| 615 | + // Set to default. |
| 616 | + intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; |
| 617 | + } else { |
| 618 | + intervalMs = Long.parseLong(newVal); |
| 619 | + if (intervalMs < 0) { |
| 620 | + rootException = new ReconfigurationException( |
| 621 | + property, |
| 622 | + newVal, |
| 623 | + getConf().get(property), |
| 624 | + new IllegalArgumentException( |
| 625 | + "block report interval must be larger than or equal to 0")); |
| 626 | + } |
| 627 | + } |
| 628 | + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { |
| 629 | + if (bpos != null) { |
| 630 | + for (BPServiceActor actor : bpos.getBPServiceActors()) { |
| 631 | + actor.getScheduler().setBlockReportIntervalMs(intervalMs); |
| 632 | + } |
602 | 633 | } |
603 | 634 | } |
604 | | - break; |
| 635 | + return Long.toString(intervalMs); |
| 636 | + } catch (NumberFormatException nfe) { |
| 637 | + rootException = new ReconfigurationException( |
| 638 | + property, newVal, getConf().get(property), nfe); |
| 639 | + } finally { |
| 640 | + if (rootException != null) { |
| 641 | + LOG.warn(String.format( |
| 642 | + "Exception in updating block report interval %s to %s", |
| 643 | + property, newVal), rootException); |
| 644 | + throw rootException; |
| 645 | + } |
605 | 646 | } |
606 | | - default: |
607 | | - break; |
| 647 | + break; |
| 648 | + } |
| 649 | + default: |
| 650 | + break; |
608 | 651 | } |
609 | 652 | throw new ReconfigurationException( |
610 | 653 | property, newVal, getConf().get(property)); |
|
0 commit comments