Skip to content

Commit b18870e

Browse files
slfan1989jiajunmao
authored andcommitted
YARN-11577. Improve FederationInterceptorREST Method Result. (apache#6190) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <[email protected]> Signed-off-by: Shilun Fan <[email protected]>
1 parent 3343b4f commit b18870e

File tree

7 files changed

+1275
-23
lines changed

7 files changed

+1275
-23
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,9 @@ public static void logAndThrowException(Throwable t, String errMsgFormat, Object
105105
throws YarnException {
106106
String msg = String.format(errMsgFormat, args);
107107
if (t != null) {
108-
LOG.error(msg, t);
109-
throw new YarnException(msg, t);
108+
String newErrMsg = getErrorMsg(msg, t);
109+
LOG.error(newErrMsg, t);
110+
throw new YarnException(newErrMsg, t);
110111
} else {
111112
LOG.error(msg);
112113
throw new YarnException(msg);
@@ -234,8 +235,9 @@ private static List<String> getInterceptorClassNames(Configuration conf,
234235
public static void logAndThrowIOException(String errMsg, Throwable t)
235236
throws IOException {
236237
if (t != null) {
237-
LOG.error(errMsg, t);
238-
throw new IOException(errMsg, t);
238+
String newErrMsg = getErrorMsg(errMsg, t);
239+
LOG.error(newErrMsg, t);
240+
throw new IOException(newErrMsg, t);
239241
} else {
240242
LOG.error(errMsg);
241243
throw new IOException(errMsg);
@@ -256,8 +258,9 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje
256258
throws IOException {
257259
String msg = String.format(errMsgFormat, args);
258260
if (t != null) {
259-
LOG.error(msg, t);
260-
throw new IOException(msg, t);
261+
String newErrMsg = getErrorMsg(msg, t);
262+
LOG.error(newErrMsg, t);
263+
throw new IOException(newErrMsg, t);
261264
} else {
262265
LOG.error(msg);
263266
throw new IOException(msg);
@@ -276,8 +279,9 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje
276279
public static void logAndThrowRunTimeException(String errMsg, Throwable t)
277280
throws RuntimeException {
278281
if (t != null) {
279-
LOG.error(errMsg, t);
280-
throw new RuntimeException(errMsg, t);
282+
String newErrMsg = getErrorMsg(errMsg, t);
283+
LOG.error(newErrMsg, t);
284+
throw new RuntimeException(newErrMsg, t);
281285
} else {
282286
LOG.error(errMsg);
283287
throw new RuntimeException(errMsg);
@@ -298,8 +302,9 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat,
298302
throws RuntimeException {
299303
String msg = String.format(errMsgFormat, args);
300304
if (t != null) {
301-
LOG.error(msg, t);
302-
throw new RuntimeException(msg, t);
305+
String newErrMsg = getErrorMsg(msg, t);
306+
LOG.error(newErrMsg, t);
307+
throw new RuntimeException(newErrMsg, t);
303308
} else {
304309
LOG.error(msg);
305310
throw new RuntimeException(msg);
@@ -320,8 +325,9 @@ public static RuntimeException logAndReturnRunTimeException(
320325
Throwable t, String errMsgFormat, Object... args) {
321326
String msg = String.format(errMsgFormat, args);
322327
if (t != null) {
323-
LOG.error(msg, t);
324-
return new RuntimeException(msg, t);
328+
String newErrMsg = getErrorMsg(msg, t);
329+
LOG.error(newErrMsg, t);
330+
return new RuntimeException(newErrMsg, t);
325331
} else {
326332
LOG.error(msg);
327333
return new RuntimeException(msg);
@@ -356,8 +362,9 @@ public static YarnRuntimeException logAndReturnYarnRunTimeException(
356362
Throwable t, String errMsgFormat, Object... args) {
357363
String msg = String.format(errMsgFormat, args);
358364
if (t != null) {
359-
LOG.error(msg, t);
360-
return new YarnRuntimeException(msg, t);
365+
String newErrMsg = getErrorMsg(msg, t);
366+
LOG.error(newErrMsg, t);
367+
return new YarnRuntimeException(newErrMsg, t);
361368
} else {
362369
LOG.error(msg);
363370
return new YarnRuntimeException(msg);

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ protected DefaultRequestInterceptorREST getOrCreateInterceptorByAppId(String app
341341

342342
// Get homeSubCluster By appId
343343
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
344+
LOG.info("appId = {} : subClusterInfo = {}.", appId, subClusterInfo.getSubClusterId());
344345
return getOrCreateInterceptorForSubCluster(subClusterInfo);
345346
}
346347

@@ -827,7 +828,7 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
827828
});
828829

829830
if (apps.getApps().isEmpty()) {
830-
return null;
831+
return new AppsInfo();
831832
}
832833

833834
// Merge all the application reports got from all the available YARN RMs
@@ -1135,7 +1136,7 @@ public AppState getAppState(HttpServletRequest hsr, String appId)
11351136
} catch (YarnException | IllegalArgumentException e) {
11361137
LOG.error("getHomeSubClusterInfoByAppId error, applicationId = {}.", appId, e);
11371138
}
1138-
return null;
1139+
return new AppState();
11391140
}
11401141

11411142
@Override
@@ -3371,17 +3372,19 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
33713372
}
33723373

33733374
Exception exception = result.getException();
3374-
3375-
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
3376-
// an exception will be thrown directly.
3377-
if (!allowPartialResult && exception != null) {
3375+
if (exception != null) {
33783376
throw exception;
33793377
}
33803378
} catch (Throwable e) {
33813379
String subClusterId = subClusterInfo != null ?
33823380
subClusterInfo.getSubClusterId().getId() : "UNKNOWN";
33833381
LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e);
3384-
throw new YarnRuntimeException(e.getCause().getMessage(), e);
3382+
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
3383+
// an exception will be thrown directly.
3384+
if (!allowPartialResult) {
3385+
throw new YarnException("SubCluster " + subClusterId +
3386+
" failed to " + request.getMethodName() + " report.", e);
3387+
}
33853388
}
33863389
}
33873390

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
* main difference with AMRMProxyService is the protocol they implement.
107107
**/
108108
@Singleton
109-
@Path("/ws/v1/cluster")
109+
@Path(RMWSConsts.RM_WEB_SERVICE_PATH)
110110
public class RouterWebServices implements RMWebServiceProtocol {
111111

112112
private static final Logger LOG =
@@ -424,7 +424,7 @@ public BulkActivitiesInfo getBulkActivities(
424424
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
425425
@Override
426426
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
427-
@QueryParam(RMWSConsts.APP_ID) String appId,
427+
@PathParam(RMWSConsts.APPID) String appId,
428428
@QueryParam(RMWSConsts.MAX_TIME) String time,
429429
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
430430
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/subcluster/TestFederationSubCluster.java

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,21 @@
3232
import org.apache.hadoop.security.UserGroupInformation;
3333
import org.apache.hadoop.test.GenericTestUtils;
3434
import org.apache.hadoop.util.Time;
35+
import org.apache.hadoop.yarn.api.records.NodeLabel;
3536
import org.apache.hadoop.yarn.conf.YarnConfiguration;
3637
import org.apache.hadoop.yarn.exceptions.YarnException;
3738
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
3839
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
3940
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
4041
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
4142
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
43+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
44+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
45+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
46+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
47+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
48+
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
49+
import org.apache.hadoop.yarn.server.router.webapp.HTTPMethods;
4250
import org.apache.hadoop.yarn.server.router.webapp.JavaProcess;
4351
import org.slf4j.Logger;
4452
import org.slf4j.LoggerFactory;
@@ -48,11 +56,20 @@
4856
import java.security.PrivilegedExceptionAction;
4957
import java.util.LinkedList;
5058
import java.util.List;
59+
import java.util.ArrayList;
5160
import java.util.concurrent.TimeoutException;
61+
import java.util.regex.Matcher;
62+
import java.util.regex.Pattern;
5263

5364
import static javax.servlet.http.HttpServletResponse.SC_OK;
65+
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
5466
import static javax.ws.rs.core.MediaType.APPLICATION_XML;
5567
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
68+
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES;
69+
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION;
70+
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS;
71+
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW;
72+
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS;
5673
import static org.apache.hadoop.yarn.server.router.webapp.TestRouterWebServicesREST.waitWebAppRunning;
5774
import static org.junit.Assert.assertEquals;
5875

@@ -190,6 +207,8 @@ public static <T> T performGetCalls(final String routerAddress, final String pat
190207
final String queryValue) throws IOException, InterruptedException {
191208

192209
Client clientToRouter = Client.create();
210+
clientToRouter.setReadTimeout(5000);
211+
clientToRouter.setConnectTimeout(5000);
193212
WebResource toRouter = clientToRouter.resource(routerAddress).path(path);
194213

195214
final WebResource.Builder toRouterBuilder;
@@ -207,4 +226,120 @@ public static <T> T performGetCalls(final String routerAddress, final String pat
207226
return response.getEntity(returnType);
208227
});
209228
}
229+
230+
public static ClientResponse performCall(final String routerAddress, final String webAddress,
231+
final String queryKey, final String queryValue, final Object context,
232+
final HTTPMethods method) throws IOException, InterruptedException {
233+
234+
return UserGroupInformation.createRemoteUser(userName).doAs(
235+
(PrivilegedExceptionAction<ClientResponse>) () -> {
236+
Client clientToRouter = Client.create();
237+
WebResource toRouter = clientToRouter.resource(routerAddress).path(webAddress);
238+
239+
WebResource toRouterWR = toRouter;
240+
if (queryKey != null && queryValue != null) {
241+
toRouterWR = toRouterWR.queryParam(queryKey, queryValue);
242+
}
243+
244+
WebResource.Builder builder;
245+
if (context != null) {
246+
builder = toRouterWR.entity(context, APPLICATION_JSON);
247+
builder = builder.accept(APPLICATION_JSON);
248+
} else {
249+
builder = toRouterWR.accept(APPLICATION_JSON);
250+
}
251+
252+
ClientResponse response = null;
253+
254+
switch (method) {
255+
case DELETE:
256+
response = builder.delete(ClientResponse.class);
257+
break;
258+
case POST:
259+
response = builder.post(ClientResponse.class);
260+
break;
261+
case PUT:
262+
response = builder.put(ClientResponse.class);
263+
break;
264+
default:
265+
break;
266+
}
267+
268+
return response;
269+
});
270+
}
271+
272+
public String getNodeId(String rmAddress) {
273+
Client clientToRM = Client.create();
274+
clientToRM.setConnectTimeout(3000);
275+
clientToRM.setReadTimeout(3000);
276+
WebResource toRM = clientToRM.resource(rmAddress).path(RM_WEB_SERVICE_PATH + NODES);
277+
ClientResponse response =
278+
toRM.accept(APPLICATION_XML).get(ClientResponse.class);
279+
NodesInfo ci = response.getEntity(NodesInfo.class);
280+
List<NodeInfo> nodes = ci.getNodes();
281+
if (nodes.isEmpty()) {
282+
return null;
283+
}
284+
clientToRM.destroy();
285+
return nodes.get(0).getNodeId();
286+
}
287+
288+
public NewApplication getNewApplicationId(String routerAddress) {
289+
Client clientToRM = Client.create();
290+
clientToRM.setConnectTimeout(3000);
291+
clientToRM.setReadTimeout(3000);
292+
WebResource toRM = clientToRM.resource(routerAddress).path(
293+
RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION);
294+
ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class);
295+
clientToRM.destroy();
296+
return response.getEntity(NewApplication.class);
297+
}
298+
299+
public String submitApplication(String routerAddress) {
300+
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
301+
String appId = getNewApplicationId(routerAddress).getApplicationId();
302+
context.setApplicationId(appId);
303+
Client clientToRouter = Client.create();
304+
clientToRouter.setConnectTimeout(3000);
305+
clientToRouter.setReadTimeout(3000);
306+
WebResource toRM = clientToRouter.resource(routerAddress).path(
307+
RM_WEB_SERVICE_PATH + APPS);
308+
toRM.entity(context, APPLICATION_XML).accept(APPLICATION_XML).post(ClientResponse.class);
309+
clientToRouter.destroy();
310+
return appId;
311+
}
312+
313+
public NewReservation getNewReservationId(String routerAddress) {
314+
Client clientToRM = Client.create();
315+
clientToRM.setConnectTimeout(3000);
316+
clientToRM.setReadTimeout(3000);
317+
WebResource toRM = clientToRM.resource(routerAddress).
318+
path(RM_WEB_SERVICE_PATH + RESERVATION_NEW);
319+
ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class);
320+
return response.getEntity(NewReservation.class);
321+
}
322+
323+
public String addNodeLabel(String routerAddress) {
324+
Client clientToRM = Client.create();
325+
clientToRM.setConnectTimeout(3000);
326+
clientToRM.setReadTimeout(3000);
327+
WebResource toRM = clientToRM.resource(routerAddress)
328+
.path(RM_WEB_SERVICE_PATH + ADD_NODE_LABELS);
329+
List<NodeLabel> nodeLabels = new ArrayList<>();
330+
nodeLabels.add(NodeLabel.newInstance("default"));
331+
NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels);
332+
ClientResponse response = toRM
333+
.entity(context, APPLICATION_XML)
334+
.accept(APPLICATION_XML)
335+
.post(ClientResponse.class);
336+
return response.getEntity(String.class);
337+
}
338+
339+
public static String format(String format, Object... args) {
340+
Pattern p = Pattern.compile("\\{.*?}");
341+
Matcher m = p.matcher(format);
342+
String newFormat = m.replaceAll("%s");
343+
return String.format(newFormat, args);
344+
}
210345
}

0 commit comments

Comments
 (0)