Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
*
* @param applicationId Application ID
* @param credentials HDFS Tokens
* @return systemCredentialsForAppsProto SystemCredentialsForAppsProto
* @return systemCredentialsForAppsProto
*/
public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto(
ApplicationId applicationId, ByteBuffer credentials) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,19 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.io.IOException;

/**
Expand Down Expand Up @@ -84,6 +93,74 @@ public static void logAndThrowException(String errMsg, Throwable t)
}
}

public static <R> R createRequestInterceptorChain(Configuration conf, String pipeLineClassName,
String interceptorClassName, Class<R> clazz) {

List<String> interceptorClassNames = getInterceptorClassNames(conf,
pipeLineClassName, interceptorClassName);

R pipeline = null;
R current = null;

for (String className : interceptorClassNames) {
try {
Class<?> interceptorClass = conf.getClassByName(className);
if (clazz.isAssignableFrom(interceptorClass)) {
Object interceptorInstance = ReflectionUtils.newInstance(interceptorClass, conf);
if (pipeline == null) {
pipeline = clazz.cast(interceptorInstance);
current = clazz.cast(interceptorInstance);
continue;
} else {
Method method = clazz.getMethod("setNextInterceptor", clazz);
method.invoke(current, interceptorInstance);
current = clazz.cast(interceptorInstance);
}
} else {
LOG.error("Class: {} not instance of {}.", className, clazz.getCanonicalName());
throw new YarnRuntimeException("Class: " + className + " not instance of "
+ clazz.getCanonicalName());
}
} catch (ClassNotFoundException e) {
LOG.error("Could not instantiate RequestInterceptor: {}", className, e);
throw new YarnRuntimeException("Could not instantiate RequestInterceptor: " + className, e);
} catch (InvocationTargetException e) {
LOG.error("RequestInterceptor {} call setNextInterceptor error.", className, e);
throw new YarnRuntimeException("RequestInterceptor " + className
+ " call setNextInterceptor error.", e);
} catch (NoSuchMethodException e) {
LOG.error("RequestInterceptor {} does not contain the method setNextInterceptor.",
className);
throw new YarnRuntimeException("RequestInterceptor " + className +
" does not contain the method setNextInterceptor.", e);
} catch (IllegalAccessException e) {
LOG.error("RequestInterceptor {} call the method setNextInterceptor " +
"does not have access.", className);
throw new YarnRuntimeException("RequestInterceptor "
+ className + " call the method setNextInterceptor does not have access.", e);
}
}

if (pipeline == null) {
throw new YarnRuntimeException(
"RequestInterceptor pipeline is not configured in the system.");
}

return pipeline;
}

private static List<String> getInterceptorClassNames(Configuration conf,
String pipeLineClass, String interceptorClass) {
String configuredInterceptorClassNames = conf.get(pipeLineClass, interceptorClass);
List<String> interceptorClassNames = new ArrayList<>();
Collection<String> tempList =
StringUtils.getStringCollection(configuredInterceptorClassNames);
for (String item : tempList) {
interceptorClassNames.add(item.trim());
}
return interceptorClassNames;
}

/**
* Throws an IOException due to an error.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand All @@ -33,8 +30,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
Expand Down Expand Up @@ -108,8 +103,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -147,7 +142,7 @@ public RouterClientRMService() {

@Override
protected void serviceStart() throws Exception {
LOG.info("Starting Router ClientRMService");
LOG.info("Starting Router ClientRMService.");
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation.setConfiguration(conf);
Expand All @@ -161,9 +156,7 @@ protected void serviceStart() throws Exception {
int maxCacheSize =
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
this.userPipelineMap = Collections.synchronizedMap(
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
maxCacheSize, true));
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));

Configuration serverConf = new Configuration(conf);

Expand All @@ -181,14 +174,13 @@ protected void serviceStart() throws Exception {
}

this.server.start();
LOG.info("Router ClientRMService listening on address: "
+ this.server.getListenerAddress());
LOG.info("Router ClientRMService listening on address: {}.", this.server.getListenerAddress());
super.serviceStart();
}

@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping Router ClientRMService");
LOG.info("Stopping Router ClientRMService.");
if (this.server != null) {
this.server.stop();
}
Expand All @@ -201,27 +193,6 @@ public Server getServer() {
return this.server;
}

/**
* Returns the comma separated interceptor class names from the configuration.
*
* @param conf
* @return the interceptor class names as an instance of ArrayList
*/
private List<String> getInterceptorClassNames(Configuration conf) {
String configuredInterceptorClassNames =
conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS);

List<String> interceptorClassNames = new ArrayList<String>();
Collection<String> tempList =
StringUtils.getStringCollection(configuredInterceptorClassNames);
for (String item : tempList) {
interceptorClassNames.add(item.trim());
}

return interceptorClassNames;
}

@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException {
Expand Down Expand Up @@ -507,44 +478,10 @@ protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
@VisibleForTesting
protected ClientRequestInterceptor createRequestInterceptorChain() {
Configuration conf = getConfig();

List<String> interceptorClassNames = getInterceptorClassNames(conf);

ClientRequestInterceptor pipeline = null;
ClientRequestInterceptor current = null;
for (String interceptorClassName : interceptorClassNames) {
try {
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
ClientRequestInterceptor interceptorInstance =
(ClientRequestInterceptor) ReflectionUtils
.newInstance(interceptorClass, conf);
if (pipeline == null) {
pipeline = interceptorInstance;
current = interceptorInstance;
continue;
} else {
current.setNextInterceptor(interceptorInstance);
current = interceptorInstance;
}
} else {
throw new YarnRuntimeException(
"Class: " + interceptorClassName + " not instance of "
+ ClientRequestInterceptor.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate ApplicationClientRequestInterceptor: "
+ interceptorClassName,
e);
}
}

if (pipeline == null) {
throw new YarnRuntimeException(
"RequestInterceptor pipeline is not configured in the system");
}
return pipeline;
return RouterServerUtil.createRequestInterceptorChain(conf,
YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS,
ClientRequestInterceptor.class);
}

/**
Expand All @@ -565,15 +502,15 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
try {
// We should init the pipeline instance after it is created and then
// add to the map, to ensure thread safe.
LOG.info("Initializing request processing pipeline for application "
+ "for the user: {}", user);
LOG.info("Initializing request processing pipeline for application for the user: {}.",
user);

ClientRequestInterceptor interceptorChain =
this.createRequestInterceptorChain();
interceptorChain.init(user);
chainWrapper.init(interceptorChain);
} catch (Exception e) {
LOG.error("Init ClientRequestInterceptor error for user: " + user, e);
LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e);
throw e;
}

Expand Down
Loading