Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public static void main(String[] args) {
System.out.println("Hadoop " + getVersion());
System.out.println("Source code repository " + getUrl() + " -r " +
getRevision());
System.out.println("Compiled by " + getUser() + " on " + getDate());
System.out.println("Compiled by " + "AmithshaS" + " on " + getDate());
System.out.println("Compiled with protoc " + getProtocVersion());
System.out.println("From source with checksum " + getSrcChecksum());
System.out.println("This command was run using " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.available-space-block-placement-policy.balanced-space-preference-fraction";
public static final float DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT =
0.6f;
public static final String
DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY =
"dfs.namenode.available-space-rack-fault-tolerant-block-placement-policy"
+ ".balanced-space-preference-fraction";
public static final float
DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_RACK_FAULT_TOLERANT_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT =
0.6f;
public static final String DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY =
"dfs.namenode.block-placement-policy.default.prefer-local-node";
public static final boolean DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.blockmanagement;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Random;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_RACK_FAULT_TOLERANT_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY;

/**
* Space balanced rack fault tolerant block placement policy.
*/
public class AvailableSpaceRackFaultTolerantBlockPlacementPolicy
extends BlockPlacementPolicyRackFaultTolerant {

private static final Logger LOG = LoggerFactory
.getLogger(AvailableSpaceRackFaultTolerantBlockPlacementPolicy.class);
private static final Random RAND = new Random();
private int balancedPreference = (int) (100
* DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_RACK_FAULT_TOLERANT_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT);

@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, Host2NodesMap host2datanodeMap) {
super.initialize(conf, stats, clusterMap, host2datanodeMap);
float balancedPreferencePercent = conf.getFloat(
DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY,
DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_RACK_FAULT_TOLERANT_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT);

LOG.info("Available space rack fault tolerant block placement policy "
+ "initialized: "
+ DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
+ " = " + balancedPreferencePercent);

if (balancedPreferencePercent > 1.0) {
LOG.warn("The value of "
+ DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
+ " is greater than 1.0 but should be in the range 0.0 - 1.0");
}
if (balancedPreferencePercent < 0.5) {
LOG.warn("The value of "
+ DFS_NAMENODE_AVAILABLE_SPACE_RACK_FAULT_TOLERANT_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
+ " is less than 0.5 so datanodes with more used percent will"
+ " receive more block allocations.");
}
balancedPreference = (int) (100 * balancedPreferencePercent);
}

@Override
protected DatanodeDescriptor chooseDataNode(final String scope,
final Collection<Node> excludedNode, StorageType type) {
// only the code that uses DFSNetworkTopology should trigger this code path.
Preconditions.checkArgument(clusterMap instanceof DFSNetworkTopology);
DFSNetworkTopology dfsClusterMap = (DFSNetworkTopology) clusterMap;
DatanodeDescriptor a = (DatanodeDescriptor) dfsClusterMap
.chooseRandomWithStorageType(scope, excludedNode, type);
DatanodeDescriptor b = (DatanodeDescriptor) dfsClusterMap
.chooseRandomWithStorageType(scope, excludedNode, type);
return select(a, b);
}

@Override
protected DatanodeDescriptor chooseDataNode(final String scope,
final Collection<Node> excludedNode) {
DatanodeDescriptor a =
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
DatanodeDescriptor b =
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
return select(a, b);
}

private DatanodeDescriptor select(DatanodeDescriptor a,
DatanodeDescriptor b) {
if (a != null && b != null) {
int ret = compareDataNode(a, b);
if (ret == 0) {
return a;
} else if (ret < 0) {
return (RAND.nextInt(100) < balancedPreference) ? a : b;
} else {
return (RAND.nextInt(100) < balancedPreference) ? b : a;
}
} else {
return a == null ? b : a;
}
}

/**
* Compare the two data nodes.
*/
protected int compareDataNode(final DatanodeDescriptor a,
final DatanodeDescriptor b) {
if (a.equals(b)
|| Math.abs(a.getDfsUsedPercent() - b.getDfsUsedPercent()) < 5) {
return 0;
}
return a.getDfsUsedPercent() < b.getDfsUsedPercent() ? -1 : 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6290,7 +6290,7 @@ public long getNNStartedTimeInMillis() {

@Override // NameNodeMXBean
public String getCompileInfo() {
return VersionInfo.getDate() + " by " + VersionInfo.getUser() +
return VersionInfo.getDate() + " by Amithsha S" +
" from " + VersionInfo.getBranch();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@
<description>
Expert only. The time to wait, in milliseconds, between failover
attempts increases exponentially as a function of the number of
attempts made so far, with a random factor of +/- 50%. This option
attempts made so far, with a random factor of /- 50%. This option
specifies the base value used in the failover calculation. The
first failover will retry immediately. The 2nd failover attempt
will delay at least dfs.client.failover.sleep.base.millis
Expand All @@ -1531,10 +1531,10 @@
<description>
Expert only. The time to wait, in milliseconds, between failover
attempts increases exponentially as a function of the number of
attempts made so far, with a random factor of +/- 50%. This option
attempts made so far, with a random factor of /- 50%. This option
specifies the maximum value to wait between failovers.
Specifically, the time between two failover attempts will not
exceed +/- 50% of dfs.client.failover.sleep.max.millis
exceed /- 50% of dfs.client.failover.sleep.max.millis
milliseconds.
</description>
</property>
Expand Down Expand Up @@ -4296,6 +4296,20 @@
</description>
</property>

<property>
<name>dfs.namenode.available-space-rack-fault-tolerant-block-placement-policy.balanced-space-preference-fraction</name>
<value>0.6</value>
<description>
Only used when the dfs.block.replicator.classname is set to
org.apache.hadoop.hdfs.server.blockmanagement.AvailableSpaceRackFaultTolerantBlockPlacementPolicy.
Special value between 0 and 1, noninclusive. Increases chance of
placing blocks on Datanodes with less disk space used. More the value near 1
more are the chances of choosing the datanode with less percentage of data.
Similarly as the value moves near 0, the chances of choosing datanode with
high load increases as the value reaches near 0.
</description>
</property>

<property>
<name>dfs.namenode.backup.dnrpc-address</name>
<value></value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.PathUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

public class TestAvailableSpaceBlockPlacementPolicy {
private final static int numRacks = 4;
private final static int nodesPerRack = 5;
Expand Down Expand Up @@ -127,7 +128,7 @@ private static void setupDataNodeCapacity() {
*/
@Test
public void testPolicyReplacement() {
Assert.assertTrue((placementPolicy instanceof AvailableSpaceBlockPlacementPolicy));
assertTrue((placementPolicy instanceof AvailableSpaceBlockPlacementPolicy));
}

/*
Expand All @@ -147,32 +148,28 @@ public void testChooseTarget() {
.chooseTarget(file, replica, null, new ArrayList<DatanodeStorageInfo>(), false, null,
blockSize, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);

Assert.assertTrue(targets.length == replica);
assertTrue(targets.length == replica);
for (int j = 0; j < replica; j++) {
total++;
if (targets[j].getDatanodeDescriptor().getRemainingPercent() > 60) {
moreRemainingNode++;
}
}
}
Assert.assertTrue(total == replica * chooseTimes);
assertTrue(total == replica * chooseTimes);
double possibility = 1.0 * moreRemainingNode / total;
Assert.assertTrue(possibility > 0.52);
Assert.assertTrue(possibility < 0.55);
assertTrue(possibility > 0.52);
assertTrue(possibility < 0.55);
}

@Test
public void testChooseDataNode() {
try {
Collection<Node> allNodes = new ArrayList<>(dataNodes.length);
Collections.addAll(allNodes, dataNodes);
if (placementPolicy instanceof AvailableSpaceBlockPlacementPolicy){
// exclude all datanodes when chooseDataNode, no NPE should be thrown
((AvailableSpaceBlockPlacementPolicy)placementPolicy)
.chooseDataNode("~", allNodes);
}
}catch (NullPointerException npe){
Assert.fail("NPE should not be thrown");
Collection<Node> allNodes = new ArrayList<>(dataNodes.length);
Collections.addAll(allNodes, dataNodes);
if (placementPolicy instanceof AvailableSpaceBlockPlacementPolicy) {
// exclude all datanodes when chooseDataNode, no NPE should be thrown
((AvailableSpaceBlockPlacementPolicy) placementPolicy)
.chooseDataNode("~", allNodes);
}
}

Expand Down
Loading