Skip to content

Commit c5ec727

Browse files
authored
YARN-11230. [Federation] Add getContainer, signalToContainer REST APIs for Router. (#4689)
1 parent 6463f86 commit c5ec727

File tree

4 files changed

+176
-2
lines changed

4 files changed

+176
-2
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,23 @@ public static void logAndThrowIOException(String errMsg, Throwable t)
8282
}
8383
}
8484

85+
/**
86+
* Throws an RunTimeException due to an error.
87+
*
88+
* @param errMsg the error message
89+
* @param t the throwable raised in the called class.
90+
* @throws RuntimeException on failure
91+
*/
92+
@Public
93+
@Unstable
94+
public static void logAndThrowRunTimeException(String errMsg, Throwable t)
95+
throws RuntimeException {
96+
if (t != null) {
97+
LOG.error(errMsg, t);
98+
throw new RuntimeException(errMsg, t);
99+
} else {
100+
LOG.error(errMsg);
101+
throw new RuntimeException(errMsg);
102+
}
103+
}
85104
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.util.concurrent.HadoopExecutors;
5151
import org.apache.hadoop.yarn.api.records.ApplicationId;
5252
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
53+
import org.apache.hadoop.yarn.api.records.ContainerId;
5354
import org.apache.hadoop.yarn.api.records.NodeLabel;
5455
import org.apache.hadoop.yarn.conf.YarnConfiguration;
5556
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -1415,7 +1416,39 @@ public ContainersInfo getContainers(HttpServletRequest req,
14151416
public ContainerInfo getContainer(HttpServletRequest req,
14161417
HttpServletResponse res, String appId, String appAttemptId,
14171418
String containerId) {
1418-
throw new NotImplementedException("Code is not implemented");
1419+
1420+
if (appId == null || appId.isEmpty()) {
1421+
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
1422+
}
1423+
if (appAttemptId == null || appAttemptId.isEmpty()) {
1424+
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
1425+
}
1426+
if (containerId == null || containerId.isEmpty()) {
1427+
throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
1428+
}
1429+
1430+
try {
1431+
ApplicationId applicationId = ApplicationId.fromString(appId);
1432+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
1433+
1434+
if (subClusterInfo == null) {
1435+
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
1436+
applicationId, null);
1437+
}
1438+
1439+
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
1440+
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
1441+
return interceptor.getContainer(req, res, appId, appAttemptId, containerId);
1442+
} catch (IllegalArgumentException e) {
1443+
String msg = String.format(
1444+
"Unable to get the AppAttempt appId: %s, appAttemptId: %s, containerId: %s.", appId,
1445+
appAttemptId, containerId);
1446+
RouterServerUtil.logAndThrowRunTimeException(msg, e);
1447+
} catch (YarnException e) {
1448+
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
1449+
}
1450+
1451+
return null;
14191452
}
14201453

14211454
@Override
@@ -1442,7 +1475,37 @@ public void setNextInterceptor(RESTRequestInterceptor next) {
14421475
@Override
14431476
public Response signalToContainer(String containerId, String command,
14441477
HttpServletRequest req) {
1445-
throw new NotImplementedException("Code is not implemented");
1478+
1479+
if (containerId == null || containerId.isEmpty()) {
1480+
throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
1481+
}
1482+
1483+
if (command == null || command.isEmpty()) {
1484+
throw new IllegalArgumentException("Parameter error, the command is empty or null.");
1485+
}
1486+
1487+
try {
1488+
ContainerId containerIdObj = ContainerId.fromString(containerId);
1489+
ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();
1490+
1491+
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);
1492+
1493+
if (subClusterInfo == null) {
1494+
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
1495+
applicationId, null);
1496+
}
1497+
1498+
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
1499+
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
1500+
return interceptor.signalToContainer(containerId, command, req);
1501+
1502+
} catch (YarnException e) {
1503+
RouterServerUtil.logAndThrowRunTimeException("signalToContainer Failed.", e);
1504+
} catch (AuthorizationException e) {
1505+
RouterServerUtil.logAndThrowRunTimeException("signalToContainer Author Failed.", e);
1506+
}
1507+
1508+
return null;
14461509
}
14471510

14481511
@Override
@@ -1494,4 +1557,28 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
14941557
});
14951558
return results;
14961559
}
1560+
1561+
/**
1562+
* get the HomeSubCluster according to ApplicationId.
1563+
*
1564+
* @param applicationId applicationId
1565+
* @return HomeSubCluster
1566+
* @throws YarnException on failure
1567+
*/
1568+
private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException {
1569+
SubClusterInfo subClusterInfo = null;
1570+
SubClusterId subClusterId = null;
1571+
try {
1572+
subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
1573+
if (subClusterId == null) {
1574+
RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId "
1575+
+ applicationId, null);
1576+
}
1577+
subClusterInfo = federationFacade.getSubCluster(subClusterId);
1578+
} catch (YarnException e) {
1579+
RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId "
1580+
+ applicationId + " failed.", e);
1581+
}
1582+
return subClusterInfo;
1583+
}
14971584
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import javax.ws.rs.core.Response;
3636
import javax.ws.rs.core.Response.Status;
3737

38+
import org.apache.commons.lang3.EnumUtils;
3839
import org.apache.hadoop.security.authorize.AuthorizationException;
3940
import org.apache.hadoop.util.Sets;
4041
import org.apache.hadoop.util.StringUtils;
@@ -47,6 +48,7 @@
4748
import org.apache.hadoop.yarn.api.records.ContainerState;
4849
import org.apache.hadoop.yarn.api.records.ContainerReport;
4950
import org.apache.hadoop.yarn.api.records.NodeLabel;
51+
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
5052
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
5153
import org.apache.hadoop.yarn.exceptions.YarnException;
5254
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -363,4 +365,51 @@ public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId) thr
363365
return null;
364366
}
365367
}
368+
369+
@Override
370+
public ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
371+
String appId, String appAttemptId, String containerId) {
372+
if (!isRunning) {
373+
throw new RuntimeException("RM is stopped");
374+
}
375+
376+
ContainerId newContainerId = ContainerId.newContainerId(
377+
ApplicationAttemptId.fromString(appAttemptId), Integer.valueOf(containerId));
378+
379+
Resource allocatedResource = Resource.newInstance(1024, 2);
380+
381+
int subClusterId = Integer.valueOf(getSubClusterId().getId());
382+
NodeId assignedNode = NodeId.newInstance("Node", subClusterId);
383+
Priority priority = Priority.newInstance(subClusterId);
384+
long creationTime = subClusterId;
385+
long finishTime = subClusterId;
386+
String diagnosticInfo = "Diagnostic " + subClusterId;
387+
String logUrl = "Log " + subClusterId;
388+
int containerExitStatus = subClusterId;
389+
ContainerState containerState = ContainerState.COMPLETE;
390+
String nodeHttpAddress = "HttpAddress " + subClusterId;
391+
392+
ContainerReport containerReport = ContainerReport.newInstance(
393+
newContainerId, allocatedResource, assignedNode, priority,
394+
creationTime, finishTime, diagnosticInfo, logUrl,
395+
containerExitStatus, containerState, nodeHttpAddress);
396+
397+
return new ContainerInfo(containerReport);
398+
}
399+
400+
@Override
401+
public Response signalToContainer(String containerId, String command,
402+
HttpServletRequest req) throws AuthorizationException {
403+
if (!isRunning) {
404+
throw new RuntimeException("RM is stopped");
405+
}
406+
407+
if (!EnumUtils.isValidEnum(SignalContainerCommand.class, command.toUpperCase())) {
408+
String errMsg = "Invalid command: " + command.toUpperCase() + ", valid commands are: "
409+
+ Arrays.asList(SignalContainerCommand.values());
410+
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
411+
}
412+
413+
return Response.status(Status.OK).build();
414+
}
366415
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
5858
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
5959
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
60+
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
6061
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
6162
import org.apache.hadoop.yarn.util.MonotonicClock;
6263
import org.junit.Assert;
@@ -698,4 +699,22 @@ public void testGetLabelsOnNode() throws Exception {
698699
Assert.assertNotNull(nodeLabelsInfo2);
699700
Assert.assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size());
700701
}
702+
703+
@Test
704+
public void testGetContainer()
705+
throws IOException, InterruptedException, YarnException {
706+
// Submit application to multiSubCluster
707+
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
708+
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
709+
context.setApplicationId(appId.toString());
710+
711+
Assert.assertNotNull(interceptor.submitApplication(context, null));
712+
713+
ApplicationAttemptId appAttemptId =
714+
ApplicationAttemptId.newInstance(appId, 1);
715+
716+
ContainerInfo containerInfo = interceptor.getContainer(null, null,
717+
appId.toString(), appAttemptId.toString(), "0");
718+
Assert.assertNotNull(containerInfo);
719+
}
701720
}

0 commit comments

Comments
 (0)