|
18 | 18 | package org.apache.hadoop.hdfs.server.datanode; |
19 | 19 |
|
20 | 20 |
|
| 21 | +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; |
| 22 | +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; |
21 | 23 | import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; |
22 | 24 | import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; |
23 | 25 | import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; |
@@ -304,7 +306,8 @@ public class DataNode extends ReconfigurableBase |
304 | 306 | Collections.unmodifiableList( |
305 | 307 | Arrays.asList( |
306 | 308 | DFS_DATANODE_DATA_DIR_KEY, |
307 | | - DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)); |
| 309 | + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, |
| 310 | + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); |
308 | 311 |
|
309 | 312 | public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); |
310 | 313 |
|
@@ -540,78 +543,111 @@ protected Configuration getNewConf() { |
540 | 543 | public String reconfigurePropertyImpl(String property, String newVal) |
541 | 544 | throws ReconfigurationException { |
542 | 545 | switch (property) { |
543 | | - case DFS_DATANODE_DATA_DIR_KEY: { |
544 | | - IOException rootException = null; |
| 546 | + case DFS_DATANODE_DATA_DIR_KEY: { |
| 547 | + IOException rootException = null; |
| 548 | + try { |
| 549 | + LOG.info("Reconfiguring {} to {}", property, newVal); |
| 550 | + this.refreshVolumes(newVal); |
| 551 | + return getConf().get(DFS_DATANODE_DATA_DIR_KEY); |
| 552 | + } catch (IOException e) { |
| 553 | + rootException = e; |
| 554 | + } finally { |
| 555 | + // Send a full block report to let NN acknowledge the volume changes. |
545 | 556 | try { |
546 | | - LOG.info("Reconfiguring {} to {}", property, newVal); |
547 | | - this.refreshVolumes(newVal); |
548 | | - return getConf().get(DFS_DATANODE_DATA_DIR_KEY); |
| 557 | + triggerBlockReport( |
| 558 | + new BlockReportOptions.Factory().setIncremental(false).build()); |
549 | 559 | } catch (IOException e) { |
550 | | - rootException = e; |
| 560 | + LOG.warn("Exception while sending the block report after refreshing" |
| 561 | + + " volumes {} to {}", property, newVal, e); |
| 562 | + if (rootException == null) { |
| 563 | + rootException = e; |
| 564 | + } |
551 | 565 | } finally { |
552 | | - // Send a full block report to let NN acknowledge the volume changes. |
553 | | - try { |
554 | | - triggerBlockReport( |
555 | | - new BlockReportOptions.Factory().setIncremental(false).build()); |
556 | | - } catch (IOException e) { |
557 | | - LOG.warn("Exception while sending the block report after refreshing" |
558 | | - + " volumes {} to {}", property, newVal, e); |
559 | | - if (rootException == null) { |
560 | | - rootException = e; |
561 | | - } |
562 | | - } finally { |
563 | | - if (rootException != null) { |
564 | | - throw new ReconfigurationException(property, newVal, |
565 | | - getConf().get(property), rootException); |
566 | | - } |
| 566 | + if (rootException != null) { |
| 567 | + throw new ReconfigurationException(property, newVal, |
| 568 | + getConf().get(property), rootException); |
567 | 569 | } |
568 | 570 | } |
569 | | - break; |
570 | 571 | } |
571 | | - case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: { |
572 | | - ReconfigurationException rootException = null; |
573 | | - try { |
574 | | - LOG.info("Reconfiguring {} to {}", property, newVal); |
575 | | - int movers; |
576 | | - if (newVal == null) { |
577 | | - // set to default |
578 | | - movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; |
579 | | - } else { |
580 | | - movers = Integer.parseInt(newVal); |
581 | | - if (movers <= 0) { |
582 | | - rootException = new ReconfigurationException( |
583 | | - property, |
584 | | - newVal, |
585 | | - getConf().get(property), |
586 | | - new IllegalArgumentException( |
587 | | - "balancer max concurrent movers must be larger than 0")); |
588 | | - } |
589 | | - } |
590 | | - boolean success = xserver.updateBalancerMaxConcurrentMovers(movers); |
591 | | - if (!success) { |
| 572 | + break; |
| 573 | + } |
| 574 | + case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: { |
| 575 | + ReconfigurationException rootException = null; |
| 576 | + try { |
| 577 | + LOG.info("Reconfiguring {} to {}", property, newVal); |
| 578 | + int movers; |
| 579 | + if (newVal == null) { |
| 580 | + // set to default |
| 581 | + movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; |
| 582 | + } else { |
| 583 | + movers = Integer.parseInt(newVal); |
| 584 | + if (movers <= 0) { |
592 | 585 | rootException = new ReconfigurationException( |
593 | 586 | property, |
594 | 587 | newVal, |
595 | 588 | getConf().get(property), |
596 | 589 | new IllegalArgumentException( |
597 | | - "Could not modify concurrent moves thread count")); |
| 590 | + "balancer max concurrent movers must be larger than 0")); |
598 | 591 | } |
599 | | - return Integer.toString(movers); |
600 | | - } catch (NumberFormatException nfe) { |
| 592 | + } |
| 593 | + boolean success = xserver.updateBalancerMaxConcurrentMovers(movers); |
| 594 | + if (!success) { |
601 | 595 | rootException = new ReconfigurationException( |
602 | | - property, newVal, getConf().get(property), nfe); |
603 | | - } finally { |
604 | | - if (rootException != null) { |
605 | | - LOG.warn(String.format( |
606 | | - "Exception in updating balancer max concurrent movers %s to %s", |
607 | | - property, newVal), rootException); |
608 | | - throw rootException; |
| 596 | + property, |
| 597 | + newVal, |
| 598 | + getConf().get(property), |
| 599 | + new IllegalArgumentException( |
| 600 | + "Could not modify concurrent moves thread count")); |
| 601 | + } |
| 602 | + return Integer.toString(movers); |
| 603 | + } catch (NumberFormatException nfe) { |
| 604 | + rootException = new ReconfigurationException( |
| 605 | + property, newVal, getConf().get(property), nfe); |
| 606 | + } finally { |
| 607 | + if (rootException != null) { |
| 608 | + LOG.warn(String.format( |
| 609 | + "Exception in updating balancer max concurrent movers %s to %s", |
| 610 | + property, newVal), rootException); |
| 611 | + throw rootException; |
| 612 | + } |
| 613 | + } |
| 614 | + break; |
| 615 | + } |
| 616 | + case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: { |
| 617 | + ReconfigurationException rootException = null; |
| 618 | + try { |
| 619 | + LOG.info("Reconfiguring {} to {}", property, newVal); |
| 620 | + long intervalMs; |
| 621 | + if (newVal == null) { |
| 622 | + // Set to default. |
| 623 | + intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; |
| 624 | + } else { |
| 625 | + intervalMs = Long.parseLong(newVal); |
| 626 | + } |
| 627 | + dnConf.setBlockReportInterval(intervalMs); |
| 628 | + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { |
| 629 | + if (bpos != null) { |
| 630 | + for (BPServiceActor actor : bpos.getBPServiceActors()) { |
| 631 | + actor.getScheduler().setBlockReportIntervalMs(intervalMs); |
| 632 | + } |
609 | 633 | } |
610 | 634 | } |
611 | | - break; |
| 635 | + return Long.toString(intervalMs); |
| 636 | + } catch (IllegalArgumentException e) { |
| 637 | + rootException = new ReconfigurationException( |
| 638 | + property, newVal, getConf().get(property), e); |
| 639 | + } finally { |
| 640 | + if (rootException != null) { |
| 641 | + LOG.warn(String.format( |
| 642 | + "Exception in updating block report interval %s to %s", |
| 643 | + property, newVal), rootException); |
| 644 | + throw rootException; |
| 645 | + } |
612 | 646 | } |
613 | | - default: |
614 | | - break; |
| 647 | + break; |
| 648 | + } |
| 649 | + default: |
| 650 | + break; |
615 | 651 | } |
616 | 652 | throw new ReconfigurationException( |
617 | 653 | property, newVal, getConf().get(property)); |
|
0 commit comments