Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -105,8 +104,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* WebHDFS Router implementation. This is an extension of
Expand Down Expand Up @@ -456,33 +453,21 @@ private DatanodeInfo chooseDatanode(final Router router,
final String path, final HttpOpParam.Op op, final long openOffset,
final String excludeDatanodes) throws IOException {
final RouterRpcServer rpcServer = getRPCServer(router);
DatanodeInfo[] dns = {};
String resolvedNs = "";
DatanodeInfo[] dns = null;
try {
dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
}

if (op == PutOpParam.Op.CREATE) {
try {
resolvedNs = rpcServer.getCreateLocation(path).getNameserviceId();
} catch (IOException e) {
LOG.error("Cannot get the name service " +
"to create file for path {} ", path, e);
}
}

HashSet<Node> excludes = new HashSet<Node>();
Collection<String> collection =
getTrimmedStringCollection(excludeDatanodes);
for (DatanodeInfo dn : dns) {
String ns = getNsFromDataNodeNetworkLocation(dn.getNetworkLocation());
if (collection.contains(dn.getName())) {
excludes.add(dn);
} else if (op == PutOpParam.Op.CREATE && !ns.equals(resolvedNs)) {
// for CREATE, the dest dn should be in the resolved ns
excludes.add(dn);
if (excludeDatanodes != null) {
Collection<String> collection =
getTrimmedStringCollection(excludeDatanodes);
for (DatanodeInfo dn : dns) {
if (collection.contains(dn.getName())) {
excludes.add(dn);
}
}
}

Expand Down Expand Up @@ -517,22 +502,6 @@ private DatanodeInfo chooseDatanode(final Router router,
return getRandomDatanode(dns, excludes);
}

/**
* Get the nameservice info from datanode network location.
* @param location network location with format `/ns0/rack1`
* @return nameservice this datanode is in
*/
@VisibleForTesting
public static String getNsFromDataNodeNetworkLocation(String location) {
// network location should be in the format of /ns/rack
Pattern pattern = Pattern.compile("^/([^/]*)/");
Matcher matcher = pattern.matcher(location);
if (matcher.find()) {
return matcher.group(1);
}
return "";
}

/**
* Get a random Datanode from a subcluster.
* @param dns Nodes to be chosen from.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public static void createCluster(Configuration conf) throws IOException {
conf.addResource(CONTRACT_WEBHDFS_XML);

cluster = new MiniRouterDFSCluster(true, 2, conf);
cluster.setIndependentDNs();
cluster.setNumDatanodesPerNameservice(3);

// Start NNs and DNs and wait until ready
cluster.startCluster(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,23 +774,14 @@ public void startCluster(Configuration overrideConf) {
}
topology.setFederation(true);

// Generate conf for namenodes and datanodes
String ns0 = nameservices.get(0);
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
// Router also uses this configurations as initial values.
routerConf = new Configuration(overrideConf);
}

// Set independent DNs across subclusters
int numDNs = nameservices.size() * numDatanodesPerNameservice;
Configuration[] dnConfs = null;
if (!sharedDNs) {
dnConfs = new Configuration[numDNs];
int dnId = 0;
for (String nsId : nameservices) {
Configuration subclusterConf = new Configuration(nnConf);
Configuration subclusterConf = new Configuration();
subclusterConf.set(DFS_INTERNAL_NAMESERVICES_KEY, nsId);
for (int i = 0; i < numDatanodesPerNameservice; i++) {
dnConfs[dnId] = subclusterConf;
Expand All @@ -800,6 +791,14 @@ public void startCluster(Configuration overrideConf) {
}

// Start mini DFS cluster
String ns0 = nameservices.get(0);
Configuration nnConf = generateNamenodeConfiguration(ns0);
if (overrideConf != null) {
nnConf.addResource(overrideConf);
// Router also uses this configurations as initial values.
routerConf = new Configuration(overrideConf);
}

cluster = new MiniDFSCluster.Builder(nnConf)
.numDataNodes(numDNs)
.nnTopology(topology)
Expand Down

This file was deleted.