Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.omg.SendingContext.RunTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,4 +83,24 @@ public static void logAndThrowIOException(String errMsg, Throwable t)
}
}

/**
* Throws an RunTimeException due to an error.
*
* @param errMsg the error message
* @param t the throwable raised in the called class.
* @throws RuntimeException on failure
*/
@Public
@Unstable
public static void logAndThrowRunTimeException(String errMsg, Throwable t)
throws RuntimeException {
if (t != null) {
LOG.error(errMsg, t);
throw new RuntimeException(errMsg, t);
} else {
LOG.error(errMsg);
throw new RuntimeException(errMsg);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
Expand Down Expand Up @@ -1415,7 +1416,39 @@ public ContainersInfo getContainers(HttpServletRequest req,
public ContainerInfo getContainer(HttpServletRequest req,
HttpServletResponse res, String appId, String appAttemptId,
String containerId) {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
if (appAttemptId == null || appAttemptId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
}
if (containerId == null || containerId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
}

try {
ApplicationId applicationId = ApplicationId.fromString(appId);
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);

if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
applicationId, null);
}

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getContainer(req, res, appId, appAttemptId, containerId);
} catch (IllegalArgumentException e) {
String msg = String.format(
"Unable to get the AppAttempt appId: %s, appAttemptId: %s, containerId: %s.", appId,
appAttemptId, containerId);
RouterServerUtil.logAndThrowRunTimeException(msg, e);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
}

return null;
}

@Override
Expand All @@ -1442,7 +1475,37 @@ public void setNextInterceptor(RESTRequestInterceptor next) {
@Override
public Response signalToContainer(String containerId, String command,
HttpServletRequest req) {
throw new NotImplementedException("Code is not implemented");

if (containerId == null || containerId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
}

if (command == null || command.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the command is empty or null.");
}

try {
ContainerId containerIdObj = ContainerId.fromString(containerId);
ApplicationId applicationId = containerIdObj.getApplicationAttemptId().getApplicationId();

SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(applicationId);

if (subClusterInfo == null) {
RouterServerUtil.logAndThrowRunTimeException("Unable to get subCluster by applicationId = " +
applicationId, null);
}

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.signalToContainer(containerId, command, req);

} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("signalToContainer Failed.", e);
} catch (AuthorizationException e) {
RouterServerUtil.logAndThrowRunTimeException("signalToContainer Author Failed.", e);
}

return null;
}

@Override
Expand Down Expand Up @@ -1494,4 +1557,28 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
});
return results;
}

/**
* get the HomeSubCluster according to ApplicationId.
*
* @param applicationId applicationId
* @return HomeSubCluster
* @throws YarnException on failure
*/
private SubClusterInfo getHomeSubClusterInfoByAppId(ApplicationId applicationId) throws YarnException {
SubClusterInfo subClusterInfo = null;
SubClusterId subClusterId = null;
try {
subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == null) {
RouterServerUtil.logAndThrowException("Can't get HomeSubCluster by applicationId "
+ applicationId, null);
}
subClusterInfo = federationFacade.getSubCluster(subClusterId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowException("Get HomeSubClusterInfo by applicationId "
+ applicationId + " failed.", e);
}
return subClusterInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.StringUtils;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
Expand Down Expand Up @@ -363,4 +365,51 @@ public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId) thr
return null;
}
}

@Override
public ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId, String containerId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ContainerId newContainerId = ContainerId.newContainerId(
ApplicationAttemptId.fromString(appAttemptId), Integer.valueOf(containerId));

Resource allocatedResource = Resource.newInstance(1024, 2);

int subClusterId = Integer.valueOf(getSubClusterId().getId());
NodeId assignedNode = NodeId.newInstance("Node", subClusterId);
Priority priority = Priority.newInstance(subClusterId);
long creationTime = subClusterId;
long finishTime = subClusterId;
String diagnosticInfo = "Diagnostic " + subClusterId;
String logUrl = "Log " + subClusterId;
int containerExitStatus = subClusterId;
ContainerState containerState = ContainerState.COMPLETE;
String nodeHttpAddress = "HttpAddress " + subClusterId;

ContainerReport containerReport = ContainerReport.newInstance(
newContainerId, allocatedResource, assignedNode, priority,
creationTime, finishTime, diagnosticInfo, logUrl,
containerExitStatus, containerState, nodeHttpAddress);

return new ContainerInfo(containerReport);
}

@Override
public Response signalToContainer(String containerId, String command,
HttpServletRequest req) throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

if (!EnumUtils.isValidEnum(SignalContainerCommand.class, command.toUpperCase())) {
String errMsg = "Invalid command: " + command.toUpperCase() + ", valid commands are: "
+ Arrays.asList(SignalContainerCommand.values());
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}

return Response.status(Status.OK).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.Assert;
Expand Down Expand Up @@ -698,4 +699,21 @@ public void testGetLabelsOnNode() throws Exception {
Assert.assertNotNull(nodeLabelsInfo2);
Assert.assertEquals(0, nodeLabelsInfo2.getNodeLabelsName().size());
}

@Test
public void testGetContainer()
throws IOException, InterruptedException, YarnException {
// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());

Assert.assertNotNull(interceptor.submitApplication(context, null));

ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerInfo containerInfo = interceptor.getContainer(null, null,
appId.toString(), appAttemptId.toString(), "containerId");
Assert.assertNotNull(containerInfo);
}
}