Skip to content

Commit ae723bc

Browse files
committed
HDFS-16521. DFS API to retrieve slow datanodes (#4107)
1 parent f155abc commit ae723bc

File tree

18 files changed

+362
-46
lines changed

18 files changed

+362
-46
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3434,4 +3434,12 @@ public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
34343434
private boolean isLocatedBlocksRefresherEnabled() {
34353435
return clientContext.isLocatedBlocksRefresherEnabled();
34363436
}
3437+
3438+
public DatanodeInfo[] slowDatanodeReport() throws IOException {
3439+
checkOpen();
3440+
try (TraceScope ignored = tracer.newScope("slowDatanodeReport")) {
3441+
return namenode.getSlowDatanodeReport();
3442+
}
3443+
}
3444+
34373445
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3651,4 +3651,15 @@ public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
36513651
throws IOException {
36523652
return new FileSystemMultipartUploaderBuilder(this, basePath);
36533653
}
3654+
3655+
/**
3656+
* Retrieve stats for slow running datanodes.
3657+
*
3658+
* @return An array of slow datanode info.
3659+
* @throws IOException If an I/O error occurs.
3660+
*/
3661+
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
3662+
return dfs.slowDatanodeReport();
3663+
}
3664+
36543665
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,4 +2318,14 @@ public long getUsed() throws IOException {
23182318
}
23192319
return this.vfs.getUsed();
23202320
}
2321+
2322+
@Override
2323+
public DatanodeInfo[] getSlowDatanodeStats() throws IOException {
2324+
if (this.vfs == null) {
2325+
return super.getSlowDatanodeStats();
2326+
}
2327+
checkDefaultDFS(defaultDFS, "getSlowDatanodeStats");
2328+
return defaultDFS.getSlowDatanodeStats();
2329+
}
2330+
23212331
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1856,4 +1856,16 @@ BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
18561856
*/
18571857
@AtMostOnce
18581858
void satisfyStoragePolicy(String path) throws IOException;
1859+
1860+
/**
1861+
* Get report on all of the slow Datanodes. Slow running datanodes are identified based on
1862+
* the Outlier detection algorithm, if slow peer tracking is enabled for the DFS cluster.
1863+
*
1864+
* @return Datanode report for slow running datanodes.
1865+
* @throws IOException If an I/O error occurs.
1866+
*/
1867+
@Idempotent
1868+
@ReadOnly
1869+
DatanodeInfo[] getSlowDatanodeReport() throws IOException;
1870+
18591871
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@
144144
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto;
145145
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetQuotaUsageRequestProto;
146146
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto;
147+
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSlowDatanodeReportRequestProto;
147148
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportRequestProto;
148149
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportResponseProto;
149150
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshotDiffReportListingRequestProto;
@@ -2044,6 +2045,18 @@ public void satisfyStoragePolicy(String src) throws IOException {
20442045
}
20452046
}
20462047

2048+
@Override
2049+
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
2050+
GetSlowDatanodeReportRequestProto req =
2051+
GetSlowDatanodeReportRequestProto.newBuilder().build();
2052+
try {
2053+
return PBHelperClient.convert(
2054+
rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList());
2055+
} catch (ServiceException e) {
2056+
throw ProtobufHelper.getRemoteException(e);
2057+
}
2058+
}
2059+
20472060
@Override
20482061
public HAServiceProtocol.HAServiceState getHAServiceState()
20492062
throws IOException {

hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,13 @@ message GetPreferredBlockSizeResponseProto {
416416
required uint64 bsize = 1;
417417
}
418418

419+
message GetSlowDatanodeReportRequestProto {
420+
}
421+
422+
message GetSlowDatanodeReportResponseProto {
423+
repeated DatanodeInfoProto datanodeInfoProto = 1;
424+
}
425+
419426
enum SafeModeActionProto {
420427
SAFEMODE_LEAVE = 1;
421428
SAFEMODE_ENTER = 2;
@@ -1060,4 +1067,6 @@ service ClientNamenodeProtocol {
10601067
returns(SatisfyStoragePolicyResponseProto);
10611068
rpc getHAServiceState(HAServiceStateRequestProto)
10621069
returns(HAServiceStateResponseProto);
1070+
rpc getSlowDatanodeReport(GetSlowDatanodeReportRequestProto)
1071+
returns(GetSlowDatanodeReportResponseProto);
10631072
}

hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public class TestReadOnly {
7575
"getQuotaUsage",
7676
"msync",
7777
"getHAServiceState",
78-
"getECTopologyResultForPolicies"
78+
"getECTopologyResultForPolicies",
79+
"getSlowDatanodeReport"
7980
)
8081
);
8182

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1791,6 +1791,12 @@ public void satisfyStoragePolicy(String path) throws IOException {
17911791
storagePolicy.satisfyStoragePolicy(path);
17921792
}
17931793

1794+
@Override
1795+
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
1796+
rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
1797+
return rpcServer.getSlowDatanodeReport(true, 0);
1798+
}
1799+
17941800
@Override
17951801
public HAServiceProtocol.HAServiceState getHAServiceState() {
17961802
if (rpcServer.isSafeMode()) {

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -898,24 +898,7 @@ public DatanodeInfo[] getDatanodeReport(
898898
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
899899
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
900900
timeOutMs, DatanodeInfo[].class);
901-
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
902-
results.entrySet()) {
903-
FederationNamespaceInfo ns = entry.getKey();
904-
DatanodeInfo[] result = entry.getValue();
905-
for (DatanodeInfo node : result) {
906-
String nodeId = node.getXferAddr();
907-
DatanodeInfo dn = datanodesMap.get(nodeId);
908-
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
909-
// Add the subcluster as a suffix to the network location
910-
node.setNetworkLocation(
911-
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
912-
node.getNetworkLocation());
913-
datanodesMap.put(nodeId, node);
914-
} else {
915-
LOG.debug("{} is in multiple subclusters", nodeId);
916-
}
917-
}
918-
}
901+
updateDnMap(results, datanodesMap);
919902
// Map -> Array
920903
Collection<DatanodeInfo> datanodes = datanodesMap.values();
921904
return toArray(datanodes, DatanodeInfo.class);
@@ -1358,6 +1341,11 @@ public void satisfyStoragePolicy(String path) throws IOException {
13581341
clientProto.satisfyStoragePolicy(path);
13591342
}
13601343

1344+
@Override // ClientProtocol
1345+
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
1346+
return clientProto.getSlowDatanodeReport();
1347+
}
1348+
13611349
@Override // NamenodeProtocol
13621350
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
13631351
long minBlockSize) throws IOException {
@@ -1757,4 +1745,52 @@ public void refreshSuperUserGroupsConfiguration() throws IOException {
17571745
public String[] getGroupsForUser(String user) throws IOException {
17581746
return routerProto.getGroupsForUser(user);
17591747
}
1748+
1749+
/**
1750+
* Get the slow running datanodes report with a timeout.
1751+
*
1752+
* @param requireResponse If we require all the namespaces to report.
1753+
* @param timeOutMs Time out for the reply in milliseconds.
1754+
* @return List of datanodes.
1755+
* @throws IOException If it cannot get the report.
1756+
*/
1757+
public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOutMs)
1758+
throws IOException {
1759+
checkOperation(OperationCategory.UNCHECKED);
1760+
1761+
Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
1762+
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");
1763+
1764+
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
1765+
Map<FederationNamespaceInfo, DatanodeInfo[]> results =
1766+
rpcClient.invokeConcurrent(nss, method, requireResponse, false,
1767+
timeOutMs, DatanodeInfo[].class);
1768+
updateDnMap(results, datanodesMap);
1769+
// Map -> Array
1770+
Collection<DatanodeInfo> datanodes = datanodesMap.values();
1771+
return toArray(datanodes, DatanodeInfo.class);
1772+
}
1773+
1774+
private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
1775+
Map<String, DatanodeInfo> datanodesMap) {
1776+
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
1777+
results.entrySet()) {
1778+
FederationNamespaceInfo ns = entry.getKey();
1779+
DatanodeInfo[] result = entry.getValue();
1780+
for (DatanodeInfo node : result) {
1781+
String nodeId = node.getXferAddr();
1782+
DatanodeInfo dn = datanodesMap.get(nodeId);
1783+
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
1784+
// Add the subcluster as a suffix to the network location
1785+
node.setNetworkLocation(
1786+
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
1787+
node.getNetworkLocation());
1788+
datanodesMap.put(nodeId, node);
1789+
} else {
1790+
LOG.debug("{} is in multiple subclusters", nodeId);
1791+
}
1792+
}
1793+
}
1794+
}
1795+
17601796
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,7 @@ public void testProxyGetDatanodeReport() throws Exception {
690690

691691
DatanodeInfo[] combinedData =
692692
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
693+
assertEquals(0, routerProtocol.getSlowDatanodeReport().length);
693694
final Map<Integer, String> routerDNMap = new TreeMap<>();
694695
for (DatanodeInfo dn : combinedData) {
695696
String subcluster = dn.getNetworkLocation().split("/")[1];

0 commit comments

Comments
 (0)