Skip to content

Commit bb8011a

Browse files
author
slfan1989
committed
YARN-6572. Refactoring Router services to use common util classes for pipeline creations.
1 parent a55ace7 commit bb8011a

File tree

6 files changed

+130
-175
lines changed

6 files changed

+130
-175
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
7979
*
8080
* @param applicationId Application ID
8181
* @param credentials HDFS Tokens
82-
* @return systemCredentialsForAppsProto SystemCredentialsForAppsProto
82+
* @return systemCredentialsForAppsProto
8383
*/
8484
public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto(
8585
ApplicationId applicationId, ByteBuffer credentials) {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,29 @@
1818

1919
package org.apache.hadoop.yarn.server.router;
2020

21+
import org.apache.commons.lang3.reflect.FieldUtils;
22+
import org.apache.commons.lang3.reflect.MethodUtils;
2123
import org.apache.hadoop.classification.InterfaceAudience.Private;
2224
import org.apache.hadoop.classification.InterfaceAudience.Public;
2325
import org.apache.hadoop.classification.InterfaceStability.Unstable;
26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.util.ReflectionUtils;
28+
import org.apache.hadoop.util.StringUtils;
2429
import org.apache.hadoop.yarn.exceptions.YarnException;
30+
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
31+
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
32+
import org.apache.hadoop.yarn.server.router.clientrm.ClientRequestInterceptor;
33+
import org.apache.hadoop.yarn.server.router.rmadmin.RMAdminRequestInterceptor;
2534
import org.slf4j.Logger;
2635
import org.slf4j.LoggerFactory;
2736

37+
import java.lang.reflect.Field;
38+
import java.lang.reflect.InvocationTargetException;
39+
import java.lang.reflect.Method;
40+
import java.util.ArrayList;
41+
import java.util.Collection;
42+
import java.util.List;
43+
2844
/**
2945
* Common utility methods used by the Router server.
3046
*
@@ -60,4 +76,53 @@ public static void logAndThrowException(String errMsg, Throwable t)
6076
}
6177
}
6278

79+
public static <R> R createRequestInterceptorChain(Configuration conf, String pipeLineClassName,
80+
String interceptorClassName, ClientMethod request, Class<R> clazz)
81+
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
82+
83+
List<String> interceptorClassNames = getInterceptorClassNames(conf,
84+
pipeLineClassName, interceptorClassName);
85+
86+
R pipeline = null;
87+
R current = null;
88+
89+
for (String className : interceptorClassNames) {
90+
try {
91+
Class<?> interceptorClass = conf.getClassByName(className);
92+
if (clazz.isAssignableFrom(interceptorClass)) {
93+
R interceptorInstance = (R) ReflectionUtils.newInstance(interceptorClass, conf);
94+
if (pipeline == null) {
95+
pipeline = interceptorInstance;
96+
current = interceptorInstance;
97+
continue;
98+
} else {
99+
Method method = clazz.getMethod(request.getMethodName(), request.getTypes());
100+
method.invoke(current, interceptorInstance);
101+
current = interceptorInstance;
102+
}
103+
} else {
104+
LOG.error("Class: {} not instance of {}.", className, clazz.getCanonicalName());
105+
throw new YarnRuntimeException("Class: " + className + " not instance of "
106+
+ clazz.getCanonicalName());
107+
}
108+
} catch (ClassNotFoundException e) {
109+
throw new YarnRuntimeException("Could not instantiate RequestInterceptor: " + className, e);
110+
}
111+
}
112+
113+
return pipeline;
114+
}
115+
116+
private static List<String> getInterceptorClassNames(Configuration conf,
117+
String pipeLineClass, String interceptorClass) {
118+
String configuredInterceptorClassNames = conf.get(pipeLineClass, interceptorClass);
119+
List<String> interceptorClassNames = new ArrayList<String>();
120+
Collection<String> tempList =
121+
StringUtils.getStringCollection(configuredInterceptorClassNames);
122+
for (String item : tempList) {
123+
interceptorClassNames.add(item.trim());
124+
}
125+
return interceptorClassNames;
126+
}
127+
63128
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java

Lines changed: 23 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.yarn.server.router.clientrm;
2020

2121
import java.io.IOException;
22+
import java.lang.reflect.InvocationTargetException;
2223
import java.net.InetSocketAddress;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
@@ -110,6 +111,7 @@
110111
import org.apache.hadoop.yarn.exceptions.YarnException;
111112
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
112113
import org.apache.hadoop.yarn.ipc.YarnRPC;
114+
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
113115
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
114116
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
115117
import org.slf4j.Logger;
@@ -147,7 +149,7 @@ public RouterClientRMService() {
147149

148150
@Override
149151
protected void serviceStart() throws Exception {
150-
LOG.info("Starting Router ClientRMService");
152+
LOG.info("Starting Router ClientRMService.");
151153
Configuration conf = getConfig();
152154
YarnRPC rpc = YarnRPC.create(conf);
153155
UserGroupInformation.setConfiguration(conf);
@@ -161,9 +163,7 @@ protected void serviceStart() throws Exception {
161163
int maxCacheSize =
162164
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
163165
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
164-
this.userPipelineMap = Collections.synchronizedMap(
165-
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
166-
maxCacheSize, true));
166+
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
167167

168168
Configuration serverConf = new Configuration(conf);
169169

@@ -181,14 +181,13 @@ protected void serviceStart() throws Exception {
181181
}
182182

183183
this.server.start();
184-
LOG.info("Router ClientRMService listening on address: "
185-
+ this.server.getListenerAddress());
184+
LOG.info("Router ClientRMService listening on address: {}.", this.server.getListenerAddress());
186185
super.serviceStart();
187186
}
188187

189188
@Override
190189
protected void serviceStop() throws Exception {
191-
LOG.info("Stopping Router ClientRMService");
190+
LOG.info("Stopping Router ClientRMService.");
192191
if (this.server != null) {
193192
this.server.stop();
194193
}
@@ -201,27 +200,6 @@ public Server getServer() {
201200
return this.server;
202201
}
203202

204-
/**
205-
* Returns the comma separated intercepter class names from the configuration.
206-
*
207-
* @param conf
208-
* @return the intercepter class names as an instance of ArrayList
209-
*/
210-
private List<String> getInterceptorClassNames(Configuration conf) {
211-
String configuredInterceptorClassNames =
212-
conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
213-
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS);
214-
215-
List<String> interceptorClassNames = new ArrayList<String>();
216-
Collection<String> tempList =
217-
StringUtils.getStringCollection(configuredInterceptorClassNames);
218-
for (String item : tempList) {
219-
interceptorClassNames.add(item.trim());
220-
}
221-
222-
return interceptorClassNames;
223-
}
224-
225203
@Override
226204
public GetNewApplicationResponse getNewApplication(
227205
GetNewApplicationRequest request) throws YarnException, IOException {
@@ -507,42 +485,25 @@ protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
507485
@VisibleForTesting
508486
protected ClientRequestInterceptor createRequestInterceptorChain() {
509487
Configuration conf = getConfig();
488+
ClientRequestInterceptor pipeline = null;
489+
ClientMethod remoteMethod = null;
490+
try {
491+
remoteMethod = new ClientMethod("setNextInterceptor",
492+
new Class[]{ClientRequestInterceptor.class}, new Object[]{null});
510493

511-
List<String> interceptorClassNames = getInterceptorClassNames(conf);
494+
pipeline = RouterServerUtil.createRequestInterceptorChain(conf,
495+
YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
496+
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS,
497+
remoteMethod, ClientRequestInterceptor.class);
512498

513-
ClientRequestInterceptor pipeline = null;
514-
ClientRequestInterceptor current = null;
515-
for (String interceptorClassName : interceptorClassNames) {
516-
try {
517-
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
518-
if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
519-
ClientRequestInterceptor interceptorInstance =
520-
(ClientRequestInterceptor) ReflectionUtils
521-
.newInstance(interceptorClass, conf);
522-
if (pipeline == null) {
523-
pipeline = interceptorInstance;
524-
current = interceptorInstance;
525-
continue;
526-
} else {
527-
current.setNextInterceptor(interceptorInstance);
528-
current = interceptorInstance;
529-
}
530-
} else {
531-
throw new YarnRuntimeException(
532-
"Class: " + interceptorClassName + " not instance of "
533-
+ ClientRequestInterceptor.class.getCanonicalName());
534-
}
535-
} catch (ClassNotFoundException e) {
499+
if (pipeline == null) {
536500
throw new YarnRuntimeException(
537-
"Could not instantiate ApplicationClientRequestInterceptor: "
538-
+ interceptorClassName,
539-
e);
501+
"RequestInterceptor pipeline is not configured in the system.");
540502
}
541-
}
542-
543-
if (pipeline == null) {
544-
throw new YarnRuntimeException(
545-
"RequestInterceptor pipeline is not configured in the system");
503+
} catch (IOException | InvocationTargetException | NoSuchMethodException | RuntimeException
504+
| IllegalAccessException ex) {
505+
throw new YarnRuntimeException("RequestInterceptor pipeline is not configured in the system.",
506+
ex);
546507
}
547508
return pipeline;
548509
}
@@ -566,14 +527,14 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
566527
// We should init the pipeline instance after it is created and then
567528
// add to the map, to ensure thread safe.
568529
LOG.info("Initializing request processing pipeline for application "
569-
+ "for the user: {}", user);
530+
+ "for the user: {}.", user);
570531

571532
ClientRequestInterceptor interceptorChain =
572533
this.createRequestInterceptorChain();
573534
interceptorChain.init(user);
574535
chainWrapper.init(interceptorChain);
575536
} catch (Exception e) {
576-
LOG.error("Init ClientRequestInterceptor error for user: " + user, e);
537+
LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e);
577538
throw e;
578539
}
579540

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java

Lines changed: 20 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hadoop.yarn.server.router.rmadmin;
2020

2121
import java.io.IOException;
22+
import java.lang.reflect.InvocationTargetException;
2223
import java.net.InetSocketAddress;
2324
import java.util.ArrayList;
2425
import java.util.Collection;
@@ -69,6 +70,9 @@
6970
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
7071
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
7172
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
73+
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
74+
import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
75+
import org.apache.hadoop.yarn.server.router.clientrm.ClientRequestInterceptor;
7276
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
7377
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
7478
import org.slf4j.Logger;
@@ -164,27 +168,6 @@ public Server getServer() {
164168
return this.server;
165169
}
166170

167-
/**
168-
* Returns the comma separated intercepter class names from the configuration.
169-
*
170-
* @param conf
171-
* @return the intercepter class names as an instance of ArrayList
172-
*/
173-
private List<String> getInterceptorClassNames(Configuration conf) {
174-
String configuredInterceptorClassNames =
175-
conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
176-
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS);
177-
178-
List<String> interceptorClassNames = new ArrayList<String>();
179-
Collection<String> tempList =
180-
StringUtils.getStringCollection(configuredInterceptorClassNames);
181-
for (String item : tempList) {
182-
interceptorClassNames.add(item.trim());
183-
}
184-
185-
return interceptorClassNames;
186-
}
187-
188171
@VisibleForTesting
189172
protected RequestInterceptorChainWrapper getInterceptorChain()
190173
throws IOException {
@@ -215,43 +198,24 @@ protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
215198
@VisibleForTesting
216199
protected RMAdminRequestInterceptor createRequestInterceptorChain() {
217200
Configuration conf = getConfig();
201+
RMAdminRequestInterceptor pipeline = null;
202+
ClientMethod remoteMethod = null;
203+
try {
204+
remoteMethod = new ClientMethod("setNextInterceptor",
205+
new Class[]{RMAdminRequestInterceptor.class}, new Object[]{null});
218206

219-
List<String> interceptorClassNames = getInterceptorClassNames(conf);
207+
pipeline = RouterServerUtil.createRequestInterceptorChain(conf,
208+
YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
209+
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS,
210+
remoteMethod, RMAdminRequestInterceptor.class);
220211

221-
RMAdminRequestInterceptor pipeline = null;
222-
RMAdminRequestInterceptor current = null;
223-
for (String interceptorClassName : interceptorClassNames) {
224-
try {
225-
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
226-
if (RMAdminRequestInterceptor.class
227-
.isAssignableFrom(interceptorClass)) {
228-
RMAdminRequestInterceptor interceptorInstance =
229-
(RMAdminRequestInterceptor) ReflectionUtils
230-
.newInstance(interceptorClass, conf);
231-
if (pipeline == null) {
232-
pipeline = interceptorInstance;
233-
current = interceptorInstance;
234-
continue;
235-
} else {
236-
current.setNextInterceptor(interceptorInstance);
237-
current = interceptorInstance;
238-
}
239-
} else {
240-
throw new YarnRuntimeException(
241-
"Class: " + interceptorClassName + " not instance of "
242-
+ RMAdminRequestInterceptor.class.getCanonicalName());
243-
}
244-
} catch (ClassNotFoundException e) {
212+
if (pipeline == null) {
245213
throw new YarnRuntimeException(
246-
"Could not instantiate RMAdminRequestInterceptor: "
247-
+ interceptorClassName,
248-
e);
214+
"RequestInterceptor pipeline is not configured in the system.");
249215
}
250-
}
251-
252-
if (pipeline == null) {
253-
throw new YarnRuntimeException(
254-
"RequestInterceptor pipeline is not configured in the system");
216+
} catch (IOException | InvocationTargetException | NoSuchMethodException | RuntimeException
217+
| IllegalAccessException ex) {
218+
throw new YarnRuntimeException("Create RequestInterceptor Chain error.", ex);
255219
}
256220
return pipeline;
257221
}
@@ -274,14 +238,14 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
274238
try {
275239
// We should init the pipeline instance after it is created and then
276240
// add to the map, to ensure thread safe.
277-
LOG.info("Initializing request processing pipeline for user: {}", user);
241+
LOG.info("Initializing request processing pipeline for user: {}.", user);
278242

279243
RMAdminRequestInterceptor interceptorChain =
280244
this.createRequestInterceptorChain();
281245
interceptorChain.init(user);
282246
chainWrapper.init(interceptorChain);
283247
} catch (Exception e) {
284-
LOG.error("Init RMAdminRequestInterceptor error for user: " + user, e);
248+
LOG.error("Init RMAdminRequestInterceptor error for user: {}.", user, e);
285249
throw e;
286250
}
287251

0 commit comments

Comments
 (0)