diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java index 58dc82d2efb2d..85a3392da4fb0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,10 +155,20 @@ boolean running() { /** * How long in between runs of the background refresh. */ - long getRefreshInterval() { + @VisibleForTesting + public long getRefreshInterval() { return refreshInterval; } + /** + * Randomize the refresh interval timing by this amount, the actual interval will be chosen + * uniformly between {@code interval-jitter} and {@code interval+jitter}. + */ + @VisibleForTesting + public long getJitter() { + return jitter; + } + /** * Reset the current used data amount. This should be called * when the cached value is re-computed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d197e070666e9..27a6e19f7330d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -20,6 +20,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; @@ -144,6 +148,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker; import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.client.BlockReportOptions; @@ -344,7 +350,9 @@ public class DataNode extends ReconfigurableBase DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY, - DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY)); + DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY, + FS_DU_INTERVAL_KEY, + FS_GETSPACEUSED_JITTER_KEY)); public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); @@ -668,6 +676,9 @@ public String reconfigurePropertyImpl(String property, String newVal) case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY: case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY: return reconfSlowDiskParameters(property, newVal); + case FS_DU_INTERVAL_KEY: + case FS_GETSPACEUSED_JITTER_KEY: + return reconfDfsUsageParameters(property, newVal); default: break; } @@ -849,6 +860,43 @@ private String reconfSlowDiskParameters(String property, String newVal) } } + private String reconfDfsUsageParameters(String property, String newVal) + throws ReconfigurationException { + String result = null; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + if (property.equals(FS_DU_INTERVAL_KEY)) { + Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized."); + long interval = (newVal == null ? FS_DU_INTERVAL_DEFAULT : + Long.parseLong(newVal)); + result = Long.toString(interval); + List volumeList = data.getVolumeList(); + for (FsVolumeImpl fsVolume : volumeList) { + Map blockPoolSlices = fsVolume.getBlockPoolSlices(); + for (BlockPoolSlice value : blockPoolSlices.values()) { + value.updateDfsUsageConfig(interval, null); + } + } + } else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) { + Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized."); + long jitter = (newVal == null ? FS_GETSPACEUSED_JITTER_DEFAULT : + Long.parseLong(newVal)); + result = Long.toString(jitter); + List volumeList = data.getVolumeList(); + for (FsVolumeImpl fsVolume : volumeList) { + Map blockPoolSlices = fsVolume.getBlockPoolSlices(); + for (BlockPoolSlice value : blockPoolSlices.values()) { + value.updateDfsUsageConfig(null, jitter); + } + } + } + LOG.info("RECONFIGURE* changed {} to {}", property, newVal); + return result; + } catch (IllegalArgumentException | IOException e) { + throw new ReconfigurationException(property, newVal, getConf().get(property), e); + } + } + /** * Get a list of the keys of the re-configurable properties in configuration. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 177c62e017411..4bfb0cb870cda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -679,4 +680,9 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, * @throws IOException */ Set deepCopyReplica(String bpid) throws IOException; + + /** + * Get the volume list. + */ + List getVolumeList(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 4dc67072f8b10..2a0823735ad21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed; +import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -78,6 +79,9 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY; + /** * A block pool slice represents a portion of a block pool stored on a volume. * Taken together, all BlockPoolSlices sharing a block pool ID across a @@ -85,7 +89,7 @@ * * This class is synchronized by {@link FsVolumeImpl}. */ -class BlockPoolSlice { +public class BlockPoolSlice { static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class); private final String bpid; @@ -111,6 +115,8 @@ class BlockPoolSlice { private final Timer timer; private final int maxDataLength; private final FileIoProvider fileIoProvider; + private final Configuration config; + private final File bpDir; private static ForkJoinPool addReplicaThreadPool = null; private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime @@ -124,7 +130,7 @@ public int compare(File f1, File f2) { }; // TODO:FEDERATION scalability issue - a thread per DU is needed - private final GetSpaceUsed dfsUsage; + private volatile GetSpaceUsed dfsUsage; /** * Create a blook pool slice @@ -137,6 +143,8 @@ public int compare(File f1, File f2) { */ BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir, Configuration conf, Timer timer) throws IOException { + this.config = conf; + this.bpDir = bpDir; this.bpid = bpid; this.volume = volume; this.fileIoProvider = volume.getFileIoProvider(); @@ -228,6 +236,35 @@ public void run() { SHUTDOWN_HOOK_PRIORITY); } + public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException { + // Close the old dfsUsage if it is CachingGetSpaceUsed. + if (dfsUsage instanceof CachingGetSpaceUsed) { + ((CachingGetSpaceUsed) dfsUsage).close(); + } + if (interval != null) { + Preconditions.checkArgument(interval > 0, + FS_DU_INTERVAL_KEY + " should be larger than 0"); + config.setLong(FS_DU_INTERVAL_KEY, interval); + } + if (jitter != null) { + Preconditions.checkArgument(jitter >= 0, + FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0"); + config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter); + } + // Start new dfsUsage. + this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid) + .setVolume(volume) + .setPath(bpDir) + .setConf(config) + .setInitialUsed(loadDfsUsed()) + .build(); + } + + @VisibleForTesting + public GetSpaceUsed getDfsUsage() { + return dfsUsage; + } + private synchronized static void initializeAddReplicaPool(Configuration conf, FsDatasetImpl dataset) { if (addReplicaThreadPool == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d8db707829fa0..a54d4ef1b9002 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -3431,5 +3431,10 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) { } } } + + @Override + public List getVolumeList() { + return volumes.getVolumes(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 15048efc72948..c59a77184a974 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -493,6 +493,10 @@ long getRecentReserved() { return recentReserved; } + public Map getBlockPoolSlices() { + return bpSlices; + } + long getReserved(){ return reserved != null ? reserved.getReserved() : 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 417ad3ce74c62..6740343e83d33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -40,6 +40,7 @@ import javax.management.ObjectName; import javax.management.StandardMBean; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.thirdparty.com.google.common.math.LongMath; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; @@ -1595,5 +1596,10 @@ public Set deepCopyReplica(String bpid) } return Collections.unmodifiableSet(replicas); } + + @Override + public List getVolumeList() { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index 1a9d6024acd0f..172a44557c1ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -19,6 +19,10 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; @@ -49,15 +53,21 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.CachingGetSpaceUsed; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.GetSpaceUsed; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.test.LambdaTestUtils; import org.junit.After; import org.junit.Assert; @@ -673,4 +683,77 @@ public void testSlowDiskParameters() throws ReconfigurationException, IOExceptio dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs()); } } + + @Test + public void testDfsUsageParameters() throws ReconfigurationException { + String[] dfsUsageParameters = { + FS_DU_INTERVAL_KEY, + FS_GETSPACEUSED_JITTER_KEY}; + + for (int i = 0; i < NUM_DATA_NODE; i++) { + DataNode dn = cluster.getDataNodes().get(i); + + // Try invalid values. + for (String parameter : dfsUsageParameters) { + try { + dn.reconfigureProperty(parameter, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } + + try { + dn.reconfigureProperty(parameter, String.valueOf(-1)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + } + + // Change and verify properties. + for (String parameter : dfsUsageParameters) { + dn.reconfigureProperty(parameter, "99"); + } + List volumeList = dn.data.getVolumeList(); + for (FsVolumeImpl fsVolume : volumeList) { + Map blockPoolSlices = fsVolume.getBlockPoolSlices(); + for (Map.Entry entry : blockPoolSlices.entrySet()) { + GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage(); + if (dfsUsage instanceof CachingGetSpaceUsed) { + assertEquals(99, + ((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval()); + assertEquals(99, + ((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter()); + } + } + } + + // Revert to default and verify. + for (String parameter : dfsUsageParameters) { + dn.reconfigureProperty(parameter, null); + } + for (FsVolumeImpl fsVolume : volumeList) { + Map blockPoolSlices = fsVolume.getBlockPoolSlices(); + for (Map.Entry entry : blockPoolSlices.entrySet()) { + GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage(); + if (dfsUsage instanceof CachingGetSpaceUsed) { + assertEquals(String.format("expect %s is not configured", + FS_DU_INTERVAL_KEY), FS_DU_INTERVAL_DEFAULT, + ((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval()); + assertEquals(String.format("expect %s is not configured", + FS_GETSPACEUSED_JITTER_KEY), FS_GETSPACEUSED_JITTER_DEFAULT, + ((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter()); + } + assertEquals(String.format("expect %s is not configured", + FS_DU_INTERVAL_KEY), null, + dn.getConf().get(FS_DU_INTERVAL_KEY)); + assertEquals(String.format("expect %s is not configured", + FS_GETSPACEUSED_JITTER_KEY), null, + dn.getConf().get(FS_GETSPACEUSED_JITTER_KEY)); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index caaa89c25e06c..cf91867b4f3b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -465,4 +466,9 @@ public Set deepCopyReplica(String bpid) throws IOException { return Collections.EMPTY_SET; } + + @Override + public List getVolumeList() { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index ba0b1b0824899..21e0e1a8c172a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -331,7 +331,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("datanode", address, outs, errs); - assertEquals(16, outs.size()); + assertEquals(18, outs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); }