Skip to content

Commit ffa9ed9

Browse files
authored
YARN-6572. Refactoring Router services to use common util classes for pipeline creations. (#4594)
1 parent 92abd99 commit ffa9ed9

File tree

6 files changed

+116
-227
lines changed

6 files changed

+116
-227
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
7979
*
8080
* @param applicationId Application ID
8181
* @param credentials HDFS Tokens
82-
* @return systemCredentialsForAppsProto SystemCredentialsForAppsProto
82+
* @return systemCredentialsForAppsProto
8383
*/
8484
public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto(
8585
ApplicationId applicationId, ByteBuffer credentials) {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,19 @@
2121
import org.apache.hadoop.classification.InterfaceAudience.Private;
2222
import org.apache.hadoop.classification.InterfaceAudience.Public;
2323
import org.apache.hadoop.classification.InterfaceStability.Unstable;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.util.ReflectionUtils;
26+
import org.apache.hadoop.util.StringUtils;
2427
import org.apache.hadoop.yarn.exceptions.YarnException;
28+
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
2529
import org.slf4j.Logger;
2630
import org.slf4j.LoggerFactory;
2731

32+
import java.lang.reflect.InvocationTargetException;
33+
import java.lang.reflect.Method;
34+
import java.util.ArrayList;
35+
import java.util.Collection;
36+
import java.util.List;
2837
import java.io.IOException;
2938

3039
/**
@@ -84,6 +93,74 @@ public static void logAndThrowException(String errMsg, Throwable t)
8493
}
8594
}
8695

96+
public static <R> R createRequestInterceptorChain(Configuration conf, String pipeLineClassName,
97+
String interceptorClassName, Class<R> clazz) {
98+
99+
List<String> interceptorClassNames = getInterceptorClassNames(conf,
100+
pipeLineClassName, interceptorClassName);
101+
102+
R pipeline = null;
103+
R current = null;
104+
105+
for (String className : interceptorClassNames) {
106+
try {
107+
Class<?> interceptorClass = conf.getClassByName(className);
108+
if (clazz.isAssignableFrom(interceptorClass)) {
109+
Object interceptorInstance = ReflectionUtils.newInstance(interceptorClass, conf);
110+
if (pipeline == null) {
111+
pipeline = clazz.cast(interceptorInstance);
112+
current = clazz.cast(interceptorInstance);
113+
continue;
114+
} else {
115+
Method method = clazz.getMethod("setNextInterceptor", clazz);
116+
method.invoke(current, interceptorInstance);
117+
current = clazz.cast(interceptorInstance);
118+
}
119+
} else {
120+
LOG.error("Class: {} not instance of {}.", className, clazz.getCanonicalName());
121+
throw new YarnRuntimeException("Class: " + className + " not instance of "
122+
+ clazz.getCanonicalName());
123+
}
124+
} catch (ClassNotFoundException e) {
125+
LOG.error("Could not instantiate RequestInterceptor: {}", className, e);
126+
throw new YarnRuntimeException("Could not instantiate RequestInterceptor: " + className, e);
127+
} catch (InvocationTargetException e) {
128+
LOG.error("RequestInterceptor {} call setNextInterceptor error.", className, e);
129+
throw new YarnRuntimeException("RequestInterceptor " + className
130+
+ " call setNextInterceptor error.", e);
131+
} catch (NoSuchMethodException e) {
132+
LOG.error("RequestInterceptor {} does not contain the method setNextInterceptor.",
133+
className);
134+
throw new YarnRuntimeException("RequestInterceptor " + className +
135+
" does not contain the method setNextInterceptor.", e);
136+
} catch (IllegalAccessException e) {
137+
LOG.error("RequestInterceptor {} call the method setNextInterceptor " +
138+
"does not have access.", className);
139+
throw new YarnRuntimeException("RequestInterceptor "
140+
+ className + " call the method setNextInterceptor does not have access.", e);
141+
}
142+
}
143+
144+
if (pipeline == null) {
145+
throw new YarnRuntimeException(
146+
"RequestInterceptor pipeline is not configured in the system.");
147+
}
148+
149+
return pipeline;
150+
}
151+
152+
private static List<String> getInterceptorClassNames(Configuration conf,
153+
String pipeLineClass, String interceptorClass) {
154+
String configuredInterceptorClassNames = conf.get(pipeLineClass, interceptorClass);
155+
List<String> interceptorClassNames = new ArrayList<>();
156+
Collection<String> tempList =
157+
StringUtils.getStringCollection(configuredInterceptorClassNames);
158+
for (String item : tempList) {
159+
interceptorClassNames.add(item.trim());
160+
}
161+
return interceptorClassNames;
162+
}
163+
87164
/**
88165
* Throws an IOException due to an error.
89166
*

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java

Lines changed: 12 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020

2121
import java.io.IOException;
2222
import java.net.InetSocketAddress;
23-
import java.util.ArrayList;
24-
import java.util.Collection;
2523
import java.util.Collections;
26-
import java.util.List;
2724
import java.util.Map;
2825

2926
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -33,8 +30,6 @@
3330
import org.apache.hadoop.security.UserGroupInformation;
3431
import org.apache.hadoop.security.authorize.PolicyProvider;
3532
import org.apache.hadoop.service.AbstractService;
36-
import org.apache.hadoop.util.ReflectionUtils;
37-
import org.apache.hadoop.util.StringUtils;
3833
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
3934
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
4035
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@@ -108,8 +103,8 @@
108103
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
109104
import org.apache.hadoop.yarn.conf.YarnConfiguration;
110105
import org.apache.hadoop.yarn.exceptions.YarnException;
111-
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
112106
import org.apache.hadoop.yarn.ipc.YarnRPC;
107+
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
113108
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
114109
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
115110
import org.slf4j.Logger;
@@ -147,7 +142,7 @@ public RouterClientRMService() {
147142

148143
@Override
149144
protected void serviceStart() throws Exception {
150-
LOG.info("Starting Router ClientRMService");
145+
LOG.info("Starting Router ClientRMService.");
151146
Configuration conf = getConfig();
152147
YarnRPC rpc = YarnRPC.create(conf);
153148
UserGroupInformation.setConfiguration(conf);
@@ -161,9 +156,7 @@ protected void serviceStart() throws Exception {
161156
int maxCacheSize =
162157
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
163158
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
164-
this.userPipelineMap = Collections.synchronizedMap(
165-
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
166-
maxCacheSize, true));
159+
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
167160

168161
Configuration serverConf = new Configuration(conf);
169162

@@ -181,14 +174,13 @@ protected void serviceStart() throws Exception {
181174
}
182175

183176
this.server.start();
184-
LOG.info("Router ClientRMService listening on address: "
185-
+ this.server.getListenerAddress());
177+
LOG.info("Router ClientRMService listening on address: {}.", this.server.getListenerAddress());
186178
super.serviceStart();
187179
}
188180

189181
@Override
190182
protected void serviceStop() throws Exception {
191-
LOG.info("Stopping Router ClientRMService");
183+
LOG.info("Stopping Router ClientRMService.");
192184
if (this.server != null) {
193185
this.server.stop();
194186
}
@@ -201,27 +193,6 @@ public Server getServer() {
201193
return this.server;
202194
}
203195

204-
/**
205-
* Returns the comma separated interceptor class names from the configuration.
206-
*
207-
* @param conf
208-
* @return the interceptor class names as an instance of ArrayList
209-
*/
210-
private List<String> getInterceptorClassNames(Configuration conf) {
211-
String configuredInterceptorClassNames =
212-
conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
213-
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS);
214-
215-
List<String> interceptorClassNames = new ArrayList<String>();
216-
Collection<String> tempList =
217-
StringUtils.getStringCollection(configuredInterceptorClassNames);
218-
for (String item : tempList) {
219-
interceptorClassNames.add(item.trim());
220-
}
221-
222-
return interceptorClassNames;
223-
}
224-
225196
@Override
226197
public GetNewApplicationResponse getNewApplication(
227198
GetNewApplicationRequest request) throws YarnException, IOException {
@@ -507,44 +478,10 @@ protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
507478
@VisibleForTesting
508479
protected ClientRequestInterceptor createRequestInterceptorChain() {
509480
Configuration conf = getConfig();
510-
511-
List<String> interceptorClassNames = getInterceptorClassNames(conf);
512-
513-
ClientRequestInterceptor pipeline = null;
514-
ClientRequestInterceptor current = null;
515-
for (String interceptorClassName : interceptorClassNames) {
516-
try {
517-
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
518-
if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
519-
ClientRequestInterceptor interceptorInstance =
520-
(ClientRequestInterceptor) ReflectionUtils
521-
.newInstance(interceptorClass, conf);
522-
if (pipeline == null) {
523-
pipeline = interceptorInstance;
524-
current = interceptorInstance;
525-
continue;
526-
} else {
527-
current.setNextInterceptor(interceptorInstance);
528-
current = interceptorInstance;
529-
}
530-
} else {
531-
throw new YarnRuntimeException(
532-
"Class: " + interceptorClassName + " not instance of "
533-
+ ClientRequestInterceptor.class.getCanonicalName());
534-
}
535-
} catch (ClassNotFoundException e) {
536-
throw new YarnRuntimeException(
537-
"Could not instantiate ApplicationClientRequestInterceptor: "
538-
+ interceptorClassName,
539-
e);
540-
}
541-
}
542-
543-
if (pipeline == null) {
544-
throw new YarnRuntimeException(
545-
"RequestInterceptor pipeline is not configured in the system");
546-
}
547-
return pipeline;
481+
return RouterServerUtil.createRequestInterceptorChain(conf,
482+
YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
483+
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS,
484+
ClientRequestInterceptor.class);
548485
}
549486

550487
/**
@@ -565,15 +502,15 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
565502
try {
566503
// We should init the pipeline instance after it is created and then
567504
// add to the map, to ensure thread safe.
568-
LOG.info("Initializing request processing pipeline for application "
569-
+ "for the user: {}", user);
505+
LOG.info("Initializing request processing pipeline for application for the user: {}.",
506+
user);
570507

571508
ClientRequestInterceptor interceptorChain =
572509
this.createRequestInterceptorChain();
573510
interceptorChain.init(user);
574511
chainWrapper.init(interceptorChain);
575512
} catch (Exception e) {
576-
LOG.error("Init ClientRequestInterceptor error for user: " + user, e);
513+
LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e);
577514
throw e;
578515
}
579516

0 commit comments

Comments
 (0)