From 2bd997daee548151077cd517f63a1ec992ba6fd3 Mon Sep 17 00:00:00 2001 From: tom lee Date: Mon, 6 Dec 2021 22:41:38 +0800 Subject: [PATCH 1/6] HDFS-16371. Exclude slow disks when choosing volume --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../fsdataset/impl/FsDatasetImpl.java | 6 +- .../datanode/fsdataset/impl/FsVolumeList.java | 15 +++- .../datanode/metrics/DataNodeDiskMetrics.java | 81 +++++++++++++++++ .../src/main/resources/hdfs-default.xml | 9 ++ .../fsdataset/impl/TestFsVolumeList.java | 89 ++++++++++++++++++- 6 files changed, 199 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 0526f1e4412dc..102bbaa553a62 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -698,6 +698,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.max.disks.to.report"; public static final int DFS_DATANODE_MAX_DISKS_TO_REPORT_DEFAULT = 5; + public static final String DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_KEY = + "dfs.datanode.max.slowdisks.to.be.excluded"; + public static final int DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_DEFAULT = + 0; public static final String DFS_DATANODE_HOST_NAME_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY; public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8d37bda166091..82877269af9b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -374,7 +374,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), - blockChooserImpl, conf); + blockChooserImpl, conf, datanode.getDiskMetrics()); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf); deletingBlock = new HashMap>(); @@ -3669,5 +3669,9 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) { } } } + + public FsVolumeList getVolumes() { + return volumes; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 9400c7c7f4ca1..35a0539501547 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -33,6 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.AutoCloseableLock; @@ -67,15 +69,17 @@ class FsVolumeList { private final boolean enableSameDiskTiering; private final MountVolumeMap mountVolumeMap; private Map capacityRatioMap; + private final DataNodeDiskMetrics diskMetrics; FsVolumeList(List initialVolumeFailureInfos, BlockScanner blockScanner, VolumeChoosingPolicy blockChooser, - Configuration config) { + Configuration config, DataNodeDiskMetrics dataNodeDiskMetrics) { this.blockChooser = blockChooser; this.blockScanner = blockScanner; this.checkDirsLock = new AutoCloseableLock(); this.checkDirsLockCondition = checkDirsLock.newCondition(); + this.diskMetrics = dataNodeDiskMetrics; for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) { volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), volumeFailureInfo); @@ -100,6 +104,15 @@ List getVolumes() { private FsVolumeReference chooseVolume(List list, long blockSize, String storageId) throws IOException { + + // Exclude slow disks when choosing volume. + if (diskMetrics != null) { + List slowDisksToBeExcluded = diskMetrics.getSlowDisksToBeExcluded(); + list = list.stream() + .filter(volume -> !slowDisksToBeExcluded.contains(volume.getBaseURI().getPath())) + .collect(Collectors.toList()); + } + while (true) { FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize, storageId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index cae464bfae700..3296f483fff1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -29,14 +29,19 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp; +import org.apache.hadoop.thirdparty.com.google.common.primitives.Doubles; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.Collectors; /** * This class detects and maintains DataNode disk outliers and their @@ -69,6 +74,14 @@ public class DataNodeDiskMetrics { * Threshold in milliseconds below which a disk is definitely not slow. */ private final long lowThresholdMs; + /** + * The number of slow disks that needs to be excluded. + */ + private int maxSlowDisksToBeExcluded; + /** + * List of slow disks that need to be excluded. + */ + private List slowDisksToBeExcluded = new ArrayList<>(); public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs, Configuration conf) { @@ -80,6 +93,9 @@ public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs, lowThresholdMs = conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY, DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT); + maxSlowDisksToBeExcluded = + conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_KEY, + DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_DEFAULT); slowDiskDetector = new OutlierDetector(minOutlierDetectionDisks, lowThresholdMs); shouldRun = true; @@ -127,6 +143,15 @@ public void run() { detectAndUpdateDiskOutliers(metadataOpStats, readIoStats, writeIoStats); + + // Sort the slow disks by latency. + if (maxSlowDisksToBeExcluded > 0) { + ArrayList diskLatencies = new ArrayList<>(); + for (Map.Entry> diskStats : diskOutliersStats.entrySet()) { + diskLatencies.add(new DiskLatency(diskStats.getKey(), diskStats.getValue())); + } + sortSlowDisks(diskLatencies); + } } try { @@ -171,6 +196,58 @@ private void detectAndUpdateDiskOutliers(Map metadataOpStats, } } + private void sortSlowDisks(ArrayList diskLatencies) { + if (diskOutliersStats.isEmpty()) { + return; + } + + final PriorityQueue topNReports = new PriorityQueue<>( + maxSlowDisksToBeExcluded, + (o1, o2) -> Doubles.compare( + o1.getMaxLatency(), o2.getMaxLatency())); + + for (DiskLatency diskLatency : diskLatencies) { + if (topNReports.size() < dn.getFSDataset().getVolumeInfoMap().size()) { + topNReports.add(diskLatency); + } else if (topNReports.peek().getMaxLatency() < + diskLatency.getMaxLatency()) { + topNReports.poll(); + topNReports.add(diskLatency); + } + } + slowDisksToBeExcluded = + topNReports.stream().map(DiskLatency::getSlowDisk).collect(Collectors.toList()); + } + + /** + * This structure is a wrapper over disk latencies. + */ + public static class DiskLatency { + final private String slowDisk; + final private Map latencyMap; + + public DiskLatency( + String slowDiskID, + Map latencyMap) { + this.slowDisk = slowDiskID; + this.latencyMap = latencyMap; + } + + double getMaxLatency() { + double maxLatency = 0; + for (double latency : latencyMap.values()) { + if (latency > maxLatency) { + maxLatency = latency; + } + } + return maxLatency; + } + + public String getSlowDisk() { + return slowDisk; + } + } + private void addDiskStat(Map> diskStats, String disk, DiskOp diskOp, double latency) { if (!diskStats.containsKey(disk)) { @@ -206,4 +283,8 @@ public void addSlowDiskForTesting(String slowDiskPath, diskOutliersStats.put(slowDiskPath, latencies); } } + + public List getSlowDisksToBeExcluded() { + return slowDisksToBeExcluded; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7bcbccd81728c..e75a5c83c3d57 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2483,6 +2483,15 @@ + + dfs.datanode.max.slowdisks.to.be.excluded + 0 + + The number of slow disks that needs to be excluded. By default, this parameter is set to 0, + which disables excluding slow disk when choosing volume. + + + hadoop.user.group.metrics.percentiles.intervals diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 15495dfd59fc7..69daff93c5e6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; @@ -30,14 +31,19 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.DatanodeVolumeInfo; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -53,6 +59,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY; import static org.junit.Assert.assertEquals; @@ -73,6 +80,10 @@ public class TestFsVolumeList { private FsDatasetImpl dataset = null; private String baseDir; private BlockScanner blockScanner; + private final static int NUM_DATANODES = 3; + private final static int STORAGES_PER_DATANODE = 2; + private final static int DEFAULT_BLOCK_SIZE = 102400; + private final static int BUFFER_LENGTH = 1024; @Before public void setUp() { @@ -89,7 +100,7 @@ public void setUp() { public void testGetNextVolumeWithClosedVolume() throws IOException { FsVolumeList volumeList = new FsVolumeList( Collections.emptyList(), - blockScanner, blockChooser, conf); + blockScanner, blockChooser, conf, null); final List volumes = new ArrayList<>(); for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "nextvolume-" + i); @@ -132,7 +143,7 @@ public Boolean get() { @Test(timeout=30000) public void testReleaseVolumeRefIfNoBlockScanner() throws IOException { FsVolumeList volumeList = new FsVolumeList( - Collections.emptyList(), null, blockChooser, conf); + Collections.emptyList(), null, blockChooser, conf, null); File volDir = new File(baseDir, "volume-0"); volDir.mkdirs(); FsVolumeImpl volume = new FsVolumeImplBuilder() @@ -511,7 +522,7 @@ public void testGetVolumeWithSameDiskArchival() throws Exception { .build(); FsVolumeList volumeList = new FsVolumeList( Collections.emptyList(), - blockScanner, blockChooser, conf); + blockScanner, blockChooser, conf, null); volumeList.addVolume(archivalVolume.obtainReference()); volumeList.addVolume(diskVolume.obtainReference()); @@ -620,4 +631,76 @@ public void testDfsUsageStatWithSameDiskArchival() throws Exception { mountVolumeMap.removeVolume(spyArchivalVolume); assertEquals(dfCapacity - duReserved, spyDiskVolume.getCapacity()); } + + @Test + public void testExcludeSlowDiskWhenChoosingVolume() throws Exception { + conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + // Set datanode outliers report interval to 1s. + conf.setStrings(DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1s"); + // Enable datanode disk metrics collector. + conf.setInt(DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, 30); + // Enable excluding slow disks when choosing volume. + conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_KEY, 1); + // Ensure that each volume capacity is larger than the DEFAULT_BLOCK_SIZE. + long capacity = 10 * DEFAULT_BLOCK_SIZE; + long[][] capacities = new long[NUM_DATANODES][STORAGES_PER_DATANODE]; + String[] hostnames = new String[NUM_DATANODES]; + for (int i = 0; i < NUM_DATANODES; i++) { + hostnames[i] = i + "." + i + "." + i + "." + i; + for(int j = 0; j < STORAGES_PER_DATANODE; j++){ + capacities[i][j]=capacity; + } + } + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .hosts(hostnames) + .numDataNodes(NUM_DATANODES) + .storagesPerDatanode(STORAGES_PER_DATANODE) + .storageCapacities(capacities).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + // Create file for each datanode. + ArrayList dataNodes = cluster.getDataNodes(); + DataNode dn0 = dataNodes.get(0); + DataNode dn1 = dataNodes.get(1); + DataNode dn2 = dataNodes.get(2); + + // Mock the first disk of each datanode is a slow disk. + String slowDiskOnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(0) + .getVolume().getBaseURI().getPath(); + String slowDiskOnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(0) + .getVolume().getBaseURI().getPath(); + String slowDiskOnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(0) + .getVolume().getBaseURI().getPath(); + + dn0.getDiskMetrics().addSlowDiskForTesting(slowDiskOnDn0, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, + SlowDiskReports.DiskOp.METADATA, 2.0)); + dn1.getDiskMetrics().addSlowDiskForTesting(slowDiskOnDn1, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, + SlowDiskReports.DiskOp.METADATA, 2.0)); + dn2.getDiskMetrics().addSlowDiskForTesting(slowDiskOnDn2, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, + SlowDiskReports.DiskOp.METADATA, 2.0)); + + // Wait until the data on the slow disk is collected successfully. + Thread.sleep(5000); + + // Create a file with 3 replica. + DFSTestUtil.createFile(fs, new Path("/file0"), false, BUFFER_LENGTH, 1000, + DEFAULT_BLOCK_SIZE, (short) 3, 0, false, null); + + // Asserts that the number of blocks created on a slow disk is 0. + Assert.assertEquals(0, dn0.getVolumeReport().stream() + .filter(v -> (v.getPath() + "/").equals(slowDiskOnDn0)).collect(Collectors.toList()).get(0) + .getNumBlocks()); + Assert.assertEquals(0, dn1.getVolumeReport().stream() + .filter(v -> (v.getPath() + "/").equals(slowDiskOnDn1)).collect(Collectors.toList()).get(0) + .getNumBlocks()); + Assert.assertEquals(0, dn2.getVolumeReport().stream() + .filter(v -> (v.getPath() + "/").equals(slowDiskOnDn2)).collect(Collectors.toList()).get(0) + .getNumBlocks()); + } } From a4b7f56db8d5aed4c1e0a31fdd823ce948e2a08f Mon Sep 17 00:00:00 2001 From: tom lee Date: Tue, 7 Dec 2021 08:01:10 +0800 Subject: [PATCH 2/6] fix checkstyles --- .../hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java | 4 ---- .../hdfs/server/datanode/metrics/DataNodeDiskMetrics.java | 3 ++- .../hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java | 1 - 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 82877269af9b8..776e28594f52a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -3669,9 +3669,5 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) { } } } - - public FsVolumeList getVolumes() { - return volumes; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index 3296f483fff1d..3fa689092ed32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -147,7 +147,8 @@ public void run() { // Sort the slow disks by latency. if (maxSlowDisksToBeExcluded > 0) { ArrayList diskLatencies = new ArrayList<>(); - for (Map.Entry> diskStats : diskOutliersStats.entrySet()) { + for (Map.Entry> diskStats : + diskOutliersStats.entrySet()) { diskLatencies.add(new DiskLatency(diskStats.getKey(), diskStats.getValue())); } sortSlowDisks(diskLatencies); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 69daff93c5e6f..ed9737400a519 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.protocol.DatanodeVolumeInfo; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; From 9fbdca6c3799738a51a13c814a9c69ba92356ef7 Mon Sep 17 00:00:00 2001 From: tom lee Date: Thu, 16 Dec 2021 16:46:25 +0800 Subject: [PATCH 3/6] refine unit test --- .../datanode/metrics/DataNodeDiskMetrics.java | 4 +- .../fsdataset/impl/TestFsVolumeList.java | 47 ++++++++++++++----- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index 3fa689092ed32..04b547145b80f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -203,12 +203,12 @@ private void sortSlowDisks(ArrayList diskLatencies) { } final PriorityQueue topNReports = new PriorityQueue<>( - maxSlowDisksToBeExcluded, + diskLatencies.size(), (o1, o2) -> Doubles.compare( o1.getMaxLatency(), o2.getMaxLatency())); for (DiskLatency diskLatency : diskLatencies) { - if (topNReports.size() < dn.getFSDataset().getVolumeInfoMap().size()) { + if (topNReports.size() < maxSlowDisksToBeExcluded) { topNReports.add(diskLatency); } else if (topNReports.peek().getMaxLatency() < diskLatency.getMaxLatency()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index ed9737400a519..a95d33736f735 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -80,7 +80,7 @@ public class TestFsVolumeList { private String baseDir; private BlockScanner blockScanner; private final static int NUM_DATANODES = 3; - private final static int STORAGES_PER_DATANODE = 2; + private final static int STORAGES_PER_DATANODE = 5; private final static int DEFAULT_BLOCK_SIZE = 102400; private final static int BUFFER_LENGTH = 1024; @@ -666,26 +666,49 @@ public void testExcludeSlowDiskWhenChoosingVolume() throws Exception { DataNode dn1 = dataNodes.get(1); DataNode dn2 = dataNodes.get(2); - // Mock the first disk of each datanode is a slow disk. - String slowDiskOnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(0) + // Mock the first disk of each datanode is a slowest disk. + String slowDisk0OnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(0) .getVolume().getBaseURI().getPath(); - String slowDiskOnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(0) + String slowDisk0OnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(0) .getVolume().getBaseURI().getPath(); - String slowDiskOnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(0) + String slowDisk0OnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(0) .getVolume().getBaseURI().getPath(); - dn0.getDiskMetrics().addSlowDiskForTesting(slowDiskOnDn0, ImmutableMap.of( + String slowDisk1OnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(1) + .getVolume().getBaseURI().getPath(); + String slowDisk1OnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(1) + .getVolume().getBaseURI().getPath(); + String slowDisk1OnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(1) + .getVolume().getBaseURI().getPath(); + + dn0.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn0, ImmutableMap.of( SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, SlowDiskReports.DiskOp.METADATA, 2.0)); - dn1.getDiskMetrics().addSlowDiskForTesting(slowDiskOnDn1, ImmutableMap.of( + dn1.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn1, ImmutableMap.of( SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, SlowDiskReports.DiskOp.METADATA, 2.0)); - dn2.getDiskMetrics().addSlowDiskForTesting(slowDiskOnDn2, ImmutableMap.of( + dn2.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn2, ImmutableMap.of( SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, SlowDiskReports.DiskOp.METADATA, 2.0)); + dn0.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn0, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0, + SlowDiskReports.DiskOp.METADATA, 1.0)); + dn1.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn1, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0, + SlowDiskReports.DiskOp.METADATA, 1.0)); + dn2.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn2, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0, + SlowDiskReports.DiskOp.METADATA, 1.0)); + // Wait until the data on the slow disk is collected successfully. - Thread.sleep(5000); + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return dn0.getDiskMetrics().getSlowDisksToBeExcluded().size() == 1 && + dn1.getDiskMetrics().getSlowDisksToBeExcluded().size() == 1 && + dn2.getDiskMetrics().getSlowDisksToBeExcluded().size() == 1; + } + }, 1000, 5000); // Create a file with 3 replica. DFSTestUtil.createFile(fs, new Path("/file0"), false, BUFFER_LENGTH, 1000, @@ -693,13 +716,13 @@ public void testExcludeSlowDiskWhenChoosingVolume() throws Exception { // Asserts that the number of blocks created on a slow disk is 0. Assert.assertEquals(0, dn0.getVolumeReport().stream() - .filter(v -> (v.getPath() + "/").equals(slowDiskOnDn0)).collect(Collectors.toList()).get(0) + .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn0)).collect(Collectors.toList()).get(0) .getNumBlocks()); Assert.assertEquals(0, dn1.getVolumeReport().stream() - .filter(v -> (v.getPath() + "/").equals(slowDiskOnDn1)).collect(Collectors.toList()).get(0) + .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn1)).collect(Collectors.toList()).get(0) .getNumBlocks()); Assert.assertEquals(0, dn2.getVolumeReport().stream() - .filter(v -> (v.getPath() + "/").equals(slowDiskOnDn2)).collect(Collectors.toList()).get(0) + .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn2)).collect(Collectors.toList()).get(0) .getNumBlocks()); } } From 7d8920477499673e68561b13c91aec67e61f90fe Mon Sep 17 00:00:00 2001 From: tom lee Date: Thu, 23 Dec 2021 11:36:04 +0800 Subject: [PATCH 4/6] change config name and simplify the code --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 +-- .../datanode/fsdataset/impl/FsVolumeList.java | 4 +- .../datanode/metrics/DataNodeDiskMetrics.java | 49 ++++++------------- .../src/main/resources/hdfs-default.xml | 2 +- .../fsdataset/impl/TestFsVolumeList.java | 10 ++-- 5 files changed, 27 insertions(+), 44 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 102bbaa553a62..d48210cd8bb14 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -698,9 +698,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.max.disks.to.report"; public static final int DFS_DATANODE_MAX_DISKS_TO_REPORT_DEFAULT = 5; - public static final String DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_KEY = - "dfs.datanode.max.slowdisks.to.be.excluded"; - public static final int DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_DEFAULT = + public static final String DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY = + "dfs.datanode.max.slowdisks.to.exclude"; + public static final int DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_DEFAULT = 0; public static final String DFS_DATANODE_HOST_NAME_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 35a0539501547..95470bb8ffe26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -107,9 +107,9 @@ private FsVolumeReference chooseVolume(List list, // Exclude slow disks when choosing volume. if (diskMetrics != null) { - List slowDisksToBeExcluded = diskMetrics.getSlowDisksToBeExcluded(); + List slowDisksToExclude = diskMetrics.getSlowDisksToExclude(); list = list.stream() - .filter(volume -> !slowDisksToBeExcluded.contains(volume.getBaseURI().getPath())) + .filter(volume -> !slowDisksToExclude.contains(volume.getBaseURI().getPath())) .collect(Collectors.toList()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index 04b547145b80f..080aa39d93ece 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -77,11 +78,11 @@ public class DataNodeDiskMetrics { /** * The number of slow disks that needs to be excluded. */ - private int maxSlowDisksToBeExcluded; + private int maxSlowDisksToExclude; /** * List of slow disks that need to be excluded. */ - private List slowDisksToBeExcluded = new ArrayList<>(); + private List slowDisksToExclude = new ArrayList<>(); public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs, Configuration conf) { @@ -93,9 +94,9 @@ public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs, lowThresholdMs = conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY, DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT); - maxSlowDisksToBeExcluded = - conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_KEY, - DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_DEFAULT); + maxSlowDisksToExclude = + conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, + DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_DEFAULT); slowDiskDetector = new OutlierDetector(minOutlierDetectionDisks, lowThresholdMs); shouldRun = true; @@ -144,14 +145,19 @@ public void run() { detectAndUpdateDiskOutliers(metadataOpStats, readIoStats, writeIoStats); - // Sort the slow disks by latency. - if (maxSlowDisksToBeExcluded > 0) { + // Sort the slow disks by latency and . + if (maxSlowDisksToExclude > 0) { ArrayList diskLatencies = new ArrayList<>(); for (Map.Entry> diskStats : diskOutliersStats.entrySet()) { diskLatencies.add(new DiskLatency(diskStats.getKey(), diskStats.getValue())); } - sortSlowDisks(diskLatencies); + + Collections.sort(diskLatencies, (o1, o2) + -> Double.compare(o2.getMaxLatency(), o1.getMaxLatency())); + + slowDisksToExclude = diskLatencies.stream().limit(maxSlowDisksToExclude) + .map(DiskLatency::getSlowDisk).collect(Collectors.toList()); } } @@ -197,29 +203,6 @@ private void detectAndUpdateDiskOutliers(Map metadataOpStats, } } - private void sortSlowDisks(ArrayList diskLatencies) { - if (diskOutliersStats.isEmpty()) { - return; - } - - final PriorityQueue topNReports = new PriorityQueue<>( - diskLatencies.size(), - (o1, o2) -> Doubles.compare( - o1.getMaxLatency(), o2.getMaxLatency())); - - for (DiskLatency diskLatency : diskLatencies) { - if (topNReports.size() < maxSlowDisksToBeExcluded) { - topNReports.add(diskLatency); - } else if (topNReports.peek().getMaxLatency() < - diskLatency.getMaxLatency()) { - topNReports.poll(); - topNReports.add(diskLatency); - } - } - slowDisksToBeExcluded = - topNReports.stream().map(DiskLatency::getSlowDisk).collect(Collectors.toList()); - } - /** * This structure is a wrapper over disk latencies. */ @@ -285,7 +268,7 @@ public void addSlowDiskForTesting(String slowDiskPath, } } - public List getSlowDisksToBeExcluded() { - return slowDisksToBeExcluded; + public List getSlowDisksToExclude() { + return slowDisksToExclude; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index e75a5c83c3d57..34a11606f183c 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2484,7 +2484,7 @@ - dfs.datanode.max.slowdisks.to.be.excluded + dfs.datanode.max.slowdisks.to.exclude 0 The number of slow disks that needs to be excluded. By default, this parameter is set to 0, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index a95d33736f735..4d8e0c99980d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -80,7 +80,7 @@ public class TestFsVolumeList { private String baseDir; private BlockScanner blockScanner; private final static int NUM_DATANODES = 3; - private final static int STORAGES_PER_DATANODE = 5; + private final static int STORAGES_PER_DATANODE = 3; private final static int DEFAULT_BLOCK_SIZE = 102400; private final static int BUFFER_LENGTH = 1024; @@ -640,7 +640,7 @@ public void testExcludeSlowDiskWhenChoosingVolume() throws Exception { // Enable datanode disk metrics collector. conf.setInt(DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, 30); // Enable excluding slow disks when choosing volume. - conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_BE_EXCLUDED_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, 1); // Ensure that each volume capacity is larger than the DEFAULT_BLOCK_SIZE. long capacity = 10 * DEFAULT_BLOCK_SIZE; long[][] capacities = new long[NUM_DATANODES][STORAGES_PER_DATANODE]; @@ -704,9 +704,9 @@ public void testExcludeSlowDiskWhenChoosingVolume() throws Exception { // Wait until the data on the slow disk is collected successfully. GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - return dn0.getDiskMetrics().getSlowDisksToBeExcluded().size() == 1 && - dn1.getDiskMetrics().getSlowDisksToBeExcluded().size() == 1 && - dn2.getDiskMetrics().getSlowDisksToBeExcluded().size() == 1; + return dn0.getDiskMetrics().getSlowDisksToExclude().size() == 1 && + dn1.getDiskMetrics().getSlowDisksToExclude().size() == 1 && + dn2.getDiskMetrics().getSlowDisksToExclude().size() == 1; } }, 1000, 5000); From 88caaaebd668b6d327cb85262773164e2f3815be Mon Sep 17 00:00:00 2001 From: tom lee Date: Fri, 24 Dec 2021 00:58:25 +0800 Subject: [PATCH 5/6] fix checkstyles --- .../hdfs/server/datanode/metrics/DataNodeDiskMetrics.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index 080aa39d93ece..437f4a6b8676d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp; -import org.apache.hadoop.thirdparty.com.google.common.primitives.Doubles; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +40,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.stream.Collectors; /** From eb505749011297893ed44edbcd691387e0acf5da Mon Sep 17 00:00:00 2001 From: tom lee Date: Wed, 5 Jan 2022 23:55:52 +0800 Subject: [PATCH 6/6] fix incomplete comment --- .../hdfs/server/datanode/metrics/DataNodeDiskMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index 437f4a6b8676d..9c2151c775be5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -143,7 +143,7 @@ public void run() { detectAndUpdateDiskOutliers(metadataOpStats, readIoStats, writeIoStats); - // Sort the slow disks by latency and . + // Sort the slow disks by latency and extract the top n by maxSlowDisksToExclude. if (maxSlowDisksToExclude > 0) { ArrayList diskLatencies = new ArrayList<>(); for (Map.Entry> diskStats :