Skip to content

Commit df1db00

Browse files
author
slfan1989
committed
YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation.
1 parent fc85c27 commit df1db00

File tree

3 files changed

+58
-27
lines changed

3 files changed

+58
-27
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ public void init(String userName) {
195195
}
196196

197197
numSubmitRetries =
198-
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
198+
conf.getInt(
199+
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
199200
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
200201

201202
clientRMProxies = new ConcurrentHashMap<>();

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -483,16 +483,13 @@ public static GetResourceProfileResponse mergeClusterResourceProfileResponse(
483483
*/
484484
public static GetAttributesToNodesResponse mergeAttributesToNodesResponse(
485485
Collection<GetAttributesToNodesResponse> responses) {
486-
GetAttributesToNodesResponse attributesToNodesResponse =
487-
GetAttributesToNodesResponse.newInstance(new HashMap<>());
488486
Map<NodeAttributeKey, List<NodeToAttributeValue>> nodeAttributeMap = new HashMap<>();
489487
for (GetAttributesToNodesResponse response : responses) {
490488
if (response != null && response.getAttributesToNodes() != null) {
491489
nodeAttributeMap.putAll(response.getAttributesToNodes());
492490
}
493491
}
494-
attributesToNodesResponse.setAttributeToNodes(nodeAttributeMap);
495-
return attributesToNodesResponse;
492+
return GetAttributesToNodesResponse.newInstance(nodeAttributeMap);
496493
}
497494

498495
/**
@@ -503,16 +500,13 @@ public static GetAttributesToNodesResponse mergeAttributesToNodesResponse(
503500
*/
504501
public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesResponse(
505502
Collection<GetClusterNodeAttributesResponse> responses) {
506-
GetClusterNodeAttributesResponse attributesResponse =
507-
GetClusterNodeAttributesResponse.newInstance(new HashSet<>());
508503
Set<NodeAttributeInfo> nodeAttributeInfo = new HashSet<>();
509504
for (GetClusterNodeAttributesResponse response : responses) {
510505
if (response != null && response.getNodeAttributes() != null) {
511506
nodeAttributeInfo.addAll(response.getNodeAttributes());
512507
}
513508
}
514-
attributesResponse.setNodeAttributes(nodeAttributeInfo);
515-
return attributesResponse;
509+
return GetClusterNodeAttributesResponse.newInstance(nodeAttributeInfo);
516510
}
517511

518512
/**
@@ -523,16 +517,13 @@ public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesRespons
523517
*/
524518
public static GetNodesToAttributesResponse mergeNodesToAttributesResponse(
525519
Collection<GetNodesToAttributesResponse> responses) {
526-
GetNodesToAttributesResponse attributesResponse =
527-
GetNodesToAttributesResponse.newInstance(new HashMap<>());
528520
Map<String, Set<NodeAttribute>> attributesMap = new HashMap<>();
529521
for (GetNodesToAttributesResponse response : responses) {
530522
if (response != null && response.getNodeToAttributes() != null) {
531523
attributesMap.putAll(response.getNodeToAttributes());
532524
}
533525
}
534-
attributesResponse.setNodeToAttributes(attributesMap);
535-
return attributesResponse;
526+
return GetNodesToAttributesResponse.newInstance(attributesMap);
536527
}
537528
}
538529

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
4040
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
4141
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
42+
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
4243
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
4344
import org.apache.hadoop.yarn.api.records.ApplicationId;
4445
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -659,8 +660,12 @@ public void testMergeAttributesToNodesResponse() {
659660
Assert.assertEquals(2, response.getAttributesToNodes().size());
660661

661662
Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes();
662-
Assert.assertTrue(findHostnameAndValInMapping("node2", "docker0",
663-
attrs.get(docker.getAttributeKey())));
663+
664+
NodeAttributeKey gpuKey = gpu.getAttributeKey();
665+
Assert.assertEquals(attributeValue1.toString(), attrs.get(gpuKey).get(0).toString());
666+
667+
NodeAttributeKey dockerKey = docker.getAttributeKey();
668+
Assert.assertEquals(attributeValue2.toString(), attrs.get(dockerKey).get(0).toString());
664669
}
665670

666671
@Test
@@ -698,19 +703,53 @@ public void testMergeClusterNodeAttributesResponse() {
698703

699704
Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes();
700705
Assert.assertEquals(2, nodeAttributeInfos.size());
701-
702-
Object[] objectArr = nodeAttributeInfos.toArray();
703-
Assert.assertEquals("rm.yarn.io/GPU(STRING)", objectArr[0].toString());
704-
Assert.assertEquals("rm.yarn.io/CPU(STRING)", objectArr[1].toString());
706+
Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1));
707+
Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2));
705708
}
706709

707-
private boolean findHostnameAndValInMapping(String hostname, String attrVal,
708-
List<NodeToAttributeValue> mappingVals) {
709-
for (NodeToAttributeValue value : mappingVals) {
710-
if (value.getHostname().equals(hostname)) {
711-
return attrVal.equals(value.getAttributeValue());
712-
}
713-
}
714-
return false;
710+
@Test
711+
public void testMergeNodesToAttributesResponse() {
712+
// normal response1
713+
NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
714+
NodeAttributeType.STRING, "nvida");
715+
NodeAttribute os = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
716+
NodeAttributeType.STRING, "windows64");
717+
NodeAttribute dist = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
718+
NodeAttributeType.STRING, "3_0_2");
719+
Map<String, Set<NodeAttribute>> node1Map = new HashMap<>();
720+
node1Map.put("node1", ImmutableSet.of(gpu, os, dist));
721+
GetNodesToAttributesResponse response1 = GetNodesToAttributesResponse.newInstance(node1Map);
722+
723+
// normal response2
724+
NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
725+
NodeAttributeType.STRING, "docker0");
726+
Map<String, Set<NodeAttribute>> node2Map = new HashMap<>();
727+
node2Map.put("node2", ImmutableSet.of(docker));
728+
GetNodesToAttributesResponse response2 = GetNodesToAttributesResponse.newInstance(node2Map);
729+
730+
// empty response3
731+
GetNodesToAttributesResponse response3 = GetNodesToAttributesResponse.newInstance(new HashMap<>());
732+
733+
// null response4
734+
GetNodesToAttributesResponse response4 = null;
735+
736+
List<GetNodesToAttributesResponse> responses = new ArrayList<>();
737+
responses.add(response1);
738+
responses.add(response2);
739+
responses.add(response3);
740+
responses.add(response4);
741+
742+
GetNodesToAttributesResponse response =
743+
RouterYarnClientUtils.mergeNodesToAttributesResponse(responses);
744+
745+
Assert.assertNotNull(response);
746+
747+
Map<String, Set<NodeAttribute>> hostToAttrs = response.getNodeToAttributes();
748+
Assert.assertNotNull(hostToAttrs);
749+
Assert.assertEquals(2, hostToAttrs.size());
750+
Assert.assertTrue(hostToAttrs.get("node1").contains(dist));
751+
Assert.assertTrue(hostToAttrs.get("node1").contains(gpu));
752+
Assert.assertTrue(hostToAttrs.get("node1").contains(os));
753+
Assert.assertTrue(hostToAttrs.get("node2").contains(docker));
715754
}
716755
}

0 commit comments

Comments
 (0)