Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,23 @@ public interface Node {
* exclude itself. In another words, its parent's full network location */
String getNetworkLocation();

/**
* Set this node's network location.
* @param location it's network location
*/
void setNetworkLocation(String location);

/** @return this node's self name in network topology. This should be node's
* IP or hostname.
* */
String getNetworkName();

/**
* Set this node's name, can be hostname or Ipaddress.
* @param name it's network name
*/
void setNetworkName(String name);

/** @return this node's full path in network topology. It's the concatenation
* of location and name.
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
*/
public class NodeImpl implements Node {
// host:port#
private final String name;
private String name;
// string representation of this node's location, such as /dc1/rack1
private final String location;
private String location;
// location + "/" + name
private final String path;
private String path;
// which level of the tree the node resides, start from 1 for root
private int level;
// node's parent
Expand All @@ -53,10 +53,7 @@ public NodeImpl(String name, String location, int cost) {
}
this.name = (name == null) ? ROOT : name;
this.location = NetUtils.normalize(location);
this.path = this.location.equals(PATH_SEPARATOR_STR) ?
this.location + this.name :
this.location + PATH_SEPARATOR_STR + this.name;

this.path = getPath();
this.cost = cost;
}

Expand Down Expand Up @@ -84,13 +81,32 @@ public String getNetworkName() {
return name;
}

/**
* Set this node's name, can be hostname or Ipaddress.
* @param networkName it's network name
*/
public void setNetworkName(String networkName) {
this.name = networkName;
this.path = getPath();
}

/**
* @return this node's network location
*/
public String getNetworkLocation() {
return location;
}

/**
* Set this node's network location.
* @param networkLocation it's network location
*/
@Override
public void setNetworkLocation(String networkLocation) {
this.location = networkLocation;
this.path = getPath();
}

/**
* @return this node's full path in network topology. It's the concatenation
* of location and name.
Expand Down Expand Up @@ -197,4 +213,10 @@ public int hashCode() {
public String toString() {
return getNetworkFullPath();
}

private String getPath() {
return this.location.equals(PATH_SEPARATOR_STR) ?
this.location + this.name :
this.location + PATH_SEPARATOR_STR + this.name;
}
}
3 changes: 3 additions & 0 deletions hadoop-hdds/server-scm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<testResource>
<directory>${basedir}/../../hadoop-hdds/common/src/main/resources</directory>
</testResource>
<testResource>
<directory>${basedir}/src/test/resources</directory>
</testResource>
</testResources>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,13 @@ void processNodeReport(DatanodeDetails datanodeDetails,
*/
// TODO: We can give better name to this method!
List<SCMCommand> getCommandQueue(UUID dnID);

/**
* Given datanode host address, returns the DatanodeDetails for the
* node.
*
* @param address node host address
* @return the given datanode, or null if not found
*/
DatanodeDetails getNode(String address);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
Expand All @@ -44,14 +47,19 @@
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.TableMapping;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;

import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,6 +101,9 @@ public class SCMNodeManager implements NodeManager {
// Node manager MXBean
private ObjectName nmInfoBean;
private final StorageContainerManager scmManager;
private final NetworkTopology clusterMap;
private final DNSToSwitchMapping dnsToSwitchMapping;
private final boolean useHostname;

/**
* Constructs SCM machine Manager.
Expand All @@ -108,6 +119,18 @@ public SCMNodeManager(OzoneConfiguration conf, String clusterID,
LOG.info("Entering startup safe mode.");
registerMXBean();
this.metrics = SCMNodeMetrics.create(this);
this.clusterMap = scmManager.getClusterMap();
Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
TableMapping.class, DNSToSwitchMapping.class);
DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
dnsToSwitchMappingClass, conf);
this.dnsToSwitchMapping =
((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
: new CachedDNSToSwitchMapping(newInstance));
this.useHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
}

private void registerMXBean() {
Expand Down Expand Up @@ -228,14 +251,27 @@ public RegisteredCommand register(
datanodeDetails.setIpAddress(dnAddress.getHostAddress());
}
try {
String location;
if (useHostname) {
datanodeDetails.setNetworkName(datanodeDetails.getHostName());
location = nodeResolve(datanodeDetails.getHostName());
} else {
datanodeDetails.setNetworkName(datanodeDetails.getIpAddress());
location = nodeResolve(datanodeDetails.getIpAddress());
}
if (location != null) {
datanodeDetails.setNetworkLocation(location);
}
nodeStateManager.addNode(datanodeDetails);
clusterMap.add(datanodeDetails);
// Updating Node Report, as registration is successful
processNodeReport(datanodeDetails, nodeReport);
LOG.info("Registered Data node : {}", datanodeDetails);
} catch (NodeAlreadyExistsException e) {
LOG.trace("Datanode is already registered. Datanode: {}",
datanodeDetails.toString());
}

return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
.setDatanodeUUID(datanodeDetails.getUuidString())
.setClusterID(this.clusterID)
Expand Down Expand Up @@ -515,5 +551,36 @@ public List<SCMCommand> getCommandQueue(UUID dnID) {
return commandQueue.getCommand(dnID);
}

/**
* Given datanode address or host name, returns the DatanodeDetails for the
* node.
*
* @param address node host address
* @return the given datanode, or null if not found
*/
@Override
public DatanodeDetails getNode(String address) {
Node node = null;
String location = nodeResolve(address);
if (location != null) {
node = clusterMap.getNode(location + NetConstants.PATH_SEPARATOR_STR +
address);
}
return node == null ? null : (DatanodeDetails)node;
}

private String nodeResolve(String hostname) {
List<String> hosts = new ArrayList<>(1);
hosts.add(hostname);
List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
String location = resolvedHosts.get(0);
LOG.debug("Resolve datanode {} return location {}", hostname, location);
return location;
} else {
LOG.error("Node {} Resolution failed. Please make sure that DNS table " +
"mapping or configured mapping is functional.", hostname);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,15 @@ public StorageContainerManager(OzoneConfiguration conf,
private void initializeSystemManagers(OzoneConfiguration conf,
SCMConfigurator configurator)
throws IOException {
clusterMap = new NetworkTopologyImpl(conf);

if(configurator.getScmNodeManager() != null) {
scmNodeManager = configurator.getScmNodeManager();
} else {
scmNodeManager = new SCMNodeManager(
conf, scmStorageConfig.getClusterID(), this, eventQueue);
}

clusterMap = new NetworkTopologyImpl(conf);
ContainerPlacementPolicy containerPlacementPolicy =
ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager,
clusterMap, true);
Expand Down Expand Up @@ -1067,4 +1068,12 @@ public Map<String, Integer> getContainerStateCount() {
public SCMMetadataStore getScmMetadataStore() {
return scmMetadataStore;
}

/**
* Returns the SCM network topology cluster.
* @return NetworkTopology
*/
public NetworkTopology getClusterMap() {
return this.clusterMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public static DatanodeDetails getDatanodeDetails(
*
* @return DatanodeDetails
*/
private static DatanodeDetails createDatanodeDetails(String uuid,
public static DatanodeDetails createDatanodeDetails(String uuid,
String hostname, String ipAddress, String networkLocation) {
DatanodeDetails.Port containerPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.STANDALONE, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ public List<SCMCommand> getCommandQueue(UUID dnID) {
return null;
}

@Override
public DatanodeDetails getNode(String address) {
return null;
}

/**
* A class to declare some values for the nodes so that our tests
* won't fail.
Expand Down
Loading