Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -154,10 +155,20 @@ boolean running() {
/**
* How long in between runs of the background refresh.
*/
long getRefreshInterval() {
@VisibleForTesting
public long getRefreshInterval() {
return refreshInterval;
}

/**
* Randomize the refresh interval timing by this amount, the actual interval will be chosen
* uniformly between {@code interval-jitter} and {@code interval+jitter}.
*/
@VisibleForTesting
public long getJitter() {
return jitter;
}

/**
* Reset the current used data amount. This should be called
* when the cached value is re-computed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
Expand Down Expand Up @@ -144,6 +148,8 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
Expand Down Expand Up @@ -344,7 +350,9 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY));
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY));

public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");

Expand Down Expand Up @@ -668,6 +676,9 @@ public String reconfigurePropertyImpl(String property, String newVal)
case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY:
case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY:
return reconfSlowDiskParameters(property, newVal);
case FS_DU_INTERVAL_KEY:
case FS_GETSPACEUSED_JITTER_KEY:
return reconfDfsUsageParameters(property, newVal);
default:
break;
}
Expand Down Expand Up @@ -849,6 +860,43 @@ private String reconfSlowDiskParameters(String property, String newVal)
}
}

private String reconfDfsUsageParameters(String property, String newVal)
throws ReconfigurationException {
String result = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
if (property.equals(FS_DU_INTERVAL_KEY)) {
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
long interval = (newVal == null ? FS_DU_INTERVAL_DEFAULT :
Long.parseLong(newVal));
result = Long.toString(interval);
List<FsVolumeImpl> volumeList = data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(interval, null);
}
}
} else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
long jitter = (newVal == null ? FS_GETSPACEUSED_JITTER_DEFAULT :
Long.parseLong(newVal));
result = Long.toString(jitter);
List<FsVolumeImpl> volumeList = data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(null, jitter);
}
}
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException | IOException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
Expand Down Expand Up @@ -679,4 +680,9 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
* @throws IOException
*/
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;

/**
* Get the volume list.
*/
List<FsVolumeImpl> getVolumeList();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -78,14 +79,17 @@

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;

/**
* A block pool slice represents a portion of a block pool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
* cluster represent a single block pool.
*
* This class is synchronized by {@link FsVolumeImpl}.
*/
class BlockPoolSlice {
public class BlockPoolSlice {
static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class);

private final String bpid;
Expand All @@ -111,6 +115,8 @@ class BlockPoolSlice {
private final Timer timer;
private final int maxDataLength;
private final FileIoProvider fileIoProvider;
private final Configuration config;
private final File bpDir;

private static ForkJoinPool addReplicaThreadPool = null;
private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
Expand All @@ -124,7 +130,7 @@ public int compare(File f1, File f2) {
};

// TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage;
private volatile GetSpaceUsed dfsUsage;

/**
* Create a blook pool slice
Expand All @@ -137,6 +143,8 @@ public int compare(File f1, File f2) {
*/
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
Configuration conf, Timer timer) throws IOException {
this.config = conf;
this.bpDir = bpDir;
this.bpid = bpid;
this.volume = volume;
this.fileIoProvider = volume.getFileIoProvider();
Expand Down Expand Up @@ -228,6 +236,35 @@ public void run() {
SHUTDOWN_HOOK_PRIORITY);
}

public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException {
// Close the old dfsUsage if it is CachingGetSpaceUsed.
if (dfsUsage instanceof CachingGetSpaceUsed) {
((CachingGetSpaceUsed) dfsUsage).close();
}
if (interval != null) {
Preconditions.checkArgument(interval > 0,
FS_DU_INTERVAL_KEY + " should be larger than 0");
config.setLong(FS_DU_INTERVAL_KEY, interval);
}
if (jitter != null) {
Preconditions.checkArgument(jitter >= 0,
FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
}
// Start new dfsUsage.
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
.setVolume(volume)
.setPath(bpDir)
.setConf(config)
.setInitialUsed(loadDfsUsed())
.build();
}

@VisibleForTesting
public GetSpaceUsed getDfsUsage() {
return dfsUsage;
}

private synchronized static void initializeAddReplicaPool(Configuration conf,
FsDatasetImpl dataset) {
if (addReplicaThreadPool == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3431,5 +3431,10 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) {
}
}
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return volumes.getVolumes();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ long getRecentReserved() {
return recentReserved;
}

public Map<String, BlockPoolSlice> getBlockPoolSlices() {
return bpSlices;
}

long getReserved(){
return reserved != null ? reserved.getReserved() : 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import javax.management.ObjectName;
import javax.management.StandardMBean;

import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -1595,5 +1596,10 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
}
return Collections.unmodifiableSet(replicas);
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.hadoop.hdfs.server.datanode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
Expand Down Expand Up @@ -49,15 +53,21 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -673,4 +683,77 @@ public void testSlowDiskParameters() throws ReconfigurationException, IOExceptio
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
}
}

@Test
public void testDfsUsageParameters() throws ReconfigurationException {
String[] dfsUsageParameters = {
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY};

for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);

// Try invalid values.
for (String parameter : dfsUsageParameters) {
try {
dn.reconfigureProperty(parameter, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}

try {
dn.reconfigureProperty(parameter, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
}

// Change and verify properties.
for (String parameter : dfsUsageParameters) {
dn.reconfigureProperty(parameter, "99");
}
List<FsVolumeImpl> volumeList = dn.data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
if (dfsUsage instanceof CachingGetSpaceUsed) {
assertEquals(99,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
assertEquals(99,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
}
}
}

// Revert to default and verify.
for (String parameter : dfsUsageParameters) {
dn.reconfigureProperty(parameter, null);
}
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
if (dfsUsage instanceof CachingGetSpaceUsed) {
assertEquals(String.format("expect %s is not configured",
FS_DU_INTERVAL_KEY), FS_DU_INTERVAL_DEFAULT,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
assertEquals(String.format("expect %s is not configured",
FS_GETSPACEUSED_JITTER_KEY), FS_GETSPACEUSED_JITTER_DEFAULT,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
}
assertEquals(String.format("expect %s is not configured",
FS_DU_INTERVAL_KEY), null,
dn.getConf().get(FS_DU_INTERVAL_KEY));
assertEquals(String.format("expect %s is not configured",
FS_GETSPACEUSED_JITTER_KEY), null,
dn.getConf().get(FS_GETSPACEUSED_JITTER_KEY));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
Expand Down Expand Up @@ -465,4 +466,9 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
return Collections.EMPTY_SET;
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return null;
}
}
Loading