|
46 | 46 | import org.apache.hadoop.classification.VisibleForTesting; |
47 | 47 | import org.apache.hadoop.classification.InterfaceAudience; |
48 | 48 | import org.apache.hadoop.fs.StorageType; |
| 49 | +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
49 | 50 | import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; |
50 | 51 | import org.apache.hadoop.hdfs.client.impl.DfsClientConf; |
51 | 52 | import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
@@ -528,9 +529,8 @@ boolean doWaitForRestart() { |
528 | 529 | // are congested |
529 | 530 | private final List<DatanodeInfo> congestedNodes = new ArrayList<>(); |
530 | 531 | private final Map<DatanodeInfo, Integer> slowNodeMap = new HashMap<>(); |
531 | | - private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000; |
532 | | - private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = |
533 | | - CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; |
| 532 | + private int congestionBackOffMeanTimeInMs; |
| 533 | + private int congestionBackOffMaxTimeInMs; |
534 | 534 | private int lastCongestionBackoffTime; |
535 | 535 | private int maxPipelineRecoveryRetries; |
536 | 536 | private int markSlowNodeAsBadNodeThreshold; |
@@ -564,6 +564,35 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, |
564 | 564 | this.addBlockFlags = flags; |
565 | 565 | this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries(); |
566 | 566 | this.markSlowNodeAsBadNodeThreshold = conf.getMarkSlowNodeAsBadNodeThreshold(); |
| 567 | + congestionBackOffMeanTimeInMs = dfsClient.getConfiguration().getInt( |
| 568 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME, |
| 569 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT); |
| 570 | + congestionBackOffMaxTimeInMs = dfsClient.getConfiguration().getInt( |
| 571 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, |
| 572 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT); |
| 573 | + if (congestionBackOffMeanTimeInMs <= 0) { |
| 574 | + LOG.warn("Configuration: {} is not appropriate, using default value: {}", |
| 575 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME, |
| 576 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT); |
| 577 | + } |
| 578 | + if (congestionBackOffMaxTimeInMs <= 0) { |
| 579 | + LOG.warn("Configuration: {} is not appropriate, using default value: {}", |
| 580 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, |
| 581 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT); |
| 582 | + } |
| 583 | + if (congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) { |
| 584 | + LOG.warn("Configuration: {} can not less than {}, using their default values.", |
| 585 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, |
| 586 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME); |
| 587 | + } |
| 588 | + if (congestionBackOffMeanTimeInMs <= 0 || congestionBackOffMaxTimeInMs <= 0 || |
| 589 | + congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) { |
| 590 | + congestionBackOffMeanTimeInMs = |
| 591 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT; |
| 592 | + congestionBackOffMaxTimeInMs = |
| 593 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT; |
| 594 | + } |
| 595 | + |
567 | 596 | } |
568 | 597 |
|
569 | 598 | /** |
@@ -1998,10 +2027,10 @@ private void backOffIfNecessary() throws InterruptedException { |
1998 | 2027 | sb.append(' ').append(i); |
1999 | 2028 | } |
2000 | 2029 | int range = Math.abs(lastCongestionBackoffTime * 3 - |
2001 | | - CONGESTION_BACKOFF_MEAN_TIME_IN_MS); |
| 2030 | + congestionBackOffMeanTimeInMs); |
2002 | 2031 | int base = Math.min(lastCongestionBackoffTime * 3, |
2003 | | - CONGESTION_BACKOFF_MEAN_TIME_IN_MS); |
2004 | | - t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS, |
| 2032 | + congestionBackOffMeanTimeInMs); |
| 2033 | + t = Math.min(congestionBackOffMaxTimeInMs, |
2005 | 2034 | (int)(base + Math.random() * range)); |
2006 | 2035 | lastCongestionBackoffTime = t; |
2007 | 2036 | sb.append(" are congested. Backing off for ").append(t).append(" ms"); |
|
0 commit comments