diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8beea5a64bb50..acb7fbb7fe52d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -162,27 +162,27 @@ private static void addDeprecatedKeys() {
/** Factory to create client IPC classes.*/
public static final String IPC_CLIENT_FACTORY_CLASS =
IPC_PREFIX + "client.factory.class";
- public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS =
+ public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS =
"org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl";
/** Factory to create server IPC classes.*/
- public static final String IPC_SERVER_FACTORY_CLASS =
+ public static final String IPC_SERVER_FACTORY_CLASS =
IPC_PREFIX + "server.factory.class";
- public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS =
+ public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS =
"org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl";
/** Factory to create serializable records.*/
- public static final String IPC_RECORD_FACTORY_CLASS =
+ public static final String IPC_RECORD_FACTORY_CLASS =
IPC_PREFIX + "record.factory.class";
- public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS =
+ public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS =
"org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl";
/** RPC class implementation*/
public static final String IPC_RPC_IMPL =
IPC_PREFIX + "rpc.class";
- public static final String DEFAULT_IPC_RPC_IMPL =
+ public static final String DEFAULT_IPC_RPC_IMPL =
"org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC";
-
+
////////////////////////////////
// Resource Manager Configs
////////////////////////////////
@@ -201,7 +201,7 @@ private static void addDeprecatedKeys() {
public static final long DEFAULT_RM_EPOCH_RANGE = 0;
/** The address of the applications manager interface in the RM.*/
- public static final String RM_ADDRESS =
+ public static final String RM_ADDRESS =
RM_PREFIX + "address";
public static final int DEFAULT_RM_PORT = 8032;
public static final String DEFAULT_RM_ADDRESS =
@@ -248,9 +248,9 @@ private static void addDeprecatedKeys() {
/** The Kerberos principal for the resource manager.*/
public static final String RM_PRINCIPAL =
RM_PREFIX + "principal";
-
+
/** The address of the scheduler interface.*/
- public static final String RM_SCHEDULER_ADDRESS =
+ public static final String RM_SCHEDULER_ADDRESS =
RM_PREFIX + "scheduler.address";
public static final int DEFAULT_RM_SCHEDULER_PORT = 8030;
public static final String DEFAULT_RM_SCHEDULER_ADDRESS = "0.0.0.0:" +
@@ -278,12 +278,12 @@ private static void addDeprecatedKeys() {
public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50;
/** If the port should be included or not in the node name. The node name
- * is used by the scheduler for resource requests allocation location
+ * is used by the scheduler for resource requests allocation location
* matching. Typically this is just the hostname, using the port is needed
* when using minicluster and specific NM are required.*/
public static final String RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME =
YARN_PREFIX + "scheduler.include-port-in-node-name";
- public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME =
+ public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME =
false;
/** Configured scheduler queue placement rules. */
@@ -338,19 +338,19 @@ private static void addDeprecatedKeys() {
RM_PREFIX + "scheduler.monitor.policies";
/** The address of the RM web application.*/
- public static final String RM_WEBAPP_ADDRESS =
+ public static final String RM_WEBAPP_ADDRESS =
RM_PREFIX + "webapp.address";
public static final int DEFAULT_RM_WEBAPP_PORT = 8088;
public static final String DEFAULT_RM_WEBAPP_ADDRESS = "0.0.0.0:" +
DEFAULT_RM_WEBAPP_PORT;
-
+
/** The https address of the RM web application.*/
public static final String RM_WEBAPP_HTTPS_ADDRESS =
RM_PREFIX + "webapp.https.address";
public static final boolean YARN_SSL_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
public static final String YARN_SSL_SERVER_RESOURCE_DEFAULT = "ssl-server.xml";
-
+
public static final int DEFAULT_RM_WEBAPP_HTTPS_PORT = 8090;
public static final String DEFAULT_RM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:"
+ DEFAULT_RM_WEBAPP_HTTPS_PORT;
@@ -378,17 +378,17 @@ private static void addDeprecatedKeys() {
"0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT;
/** The expiry interval for application master reporting.*/
- public static final String RM_AM_EXPIRY_INTERVAL_MS =
+ public static final String RM_AM_EXPIRY_INTERVAL_MS =
YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
/** How long to wait until a node manager is considered dead.*/
- public static final String RM_NM_EXPIRY_INTERVAL_MS =
+ public static final String RM_NM_EXPIRY_INTERVAL_MS =
YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
/** Are acls enabled.*/
- public static final String YARN_ACL_ENABLE =
+ public static final String YARN_ACL_ENABLE =
YARN_PREFIX + "acl.enable";
public static final boolean DEFAULT_YARN_ACL_ENABLE = false;
@@ -402,10 +402,10 @@ public static boolean isAclEnabled(Configuration conf) {
}
/** ACL of who can be admin of YARN cluster.*/
- public static final String YARN_ADMIN_ACL =
+ public static final String YARN_ADMIN_ACL =
YARN_PREFIX + "admin.acl";
public static final String DEFAULT_YARN_ADMIN_ACL = "*";
-
+
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
@@ -491,17 +491,17 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION = false;
/** The address of the RM admin interface.*/
- public static final String RM_ADMIN_ADDRESS =
+ public static final String RM_ADMIN_ADDRESS =
RM_PREFIX + "admin.address";
public static final int DEFAULT_RM_ADMIN_PORT = 8033;
public static final String DEFAULT_RM_ADMIN_ADDRESS = "0.0.0.0:" +
DEFAULT_RM_ADMIN_PORT;
-
+
/**Number of threads used to handle RM admin interface.*/
public static final String RM_ADMIN_CLIENT_THREAD_COUNT =
RM_PREFIX + "admin.client.thread-count";
public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1;
-
+
/**
* The maximum number of application attempts for
* an application, if unset by user.
@@ -516,15 +516,15 @@ public static boolean isAclEnabled(Configuration conf) {
*/
public static final String GLOBAL_RM_AM_MAX_ATTEMPTS =
RM_PREFIX + "am.global.max-attempts";
-
+
/** The keytab for the resource manager.*/
- public static final String RM_KEYTAB =
+ public static final String RM_KEYTAB =
RM_PREFIX + "keytab";
/**The kerberos principal to be used for spnego filter for RM.*/
public static final String RM_WEBAPP_SPNEGO_USER_NAME_KEY =
RM_PREFIX + "webapp.spnego-principal";
-
+
/**The kerberos keytab to be used for spnego filter for RM.*/
public static final String RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY =
RM_PREFIX + "webapp.spnego-keytab-file";
@@ -546,12 +546,12 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean DEFAULT_RM_WEBAPP_ENABLE_CORS_FILTER = false;
/** How long to wait until a container is considered dead.*/
- public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS =
+ public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS =
RM_PREFIX + "rm.container-allocation.expiry-interval-ms";
public static final int DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 600000;
-
+
/** Path to file with nodes to include.*/
- public static final String RM_NODES_INCLUDE_FILE_PATH =
+ public static final String RM_NODES_INCLUDE_FILE_PATH =
RM_PREFIX + "nodes.include-path";
public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = "";
@@ -572,19 +572,19 @@ public static boolean isAclEnabled(Configuration conf) {
RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms";
public static final int
DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0;
-
+
/** Path to file with nodes to exclude.*/
- public static final String RM_NODES_EXCLUDE_FILE_PATH =
+ public static final String RM_NODES_EXCLUDE_FILE_PATH =
RM_PREFIX + "nodes.exclude-path";
public static final String DEFAULT_RM_NODES_EXCLUDE_FILE_PATH = "";
-
+
/** Number of threads to handle resource tracker calls.*/
public static final String RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT =
RM_PREFIX + "resource-tracker.client.thread-count";
public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 50;
-
+
/** The class to use as the resource scheduler.*/
- public static final String RM_SCHEDULER =
+ public static final String RM_SCHEDULER =
RM_PREFIX + "scheduler.class";
/**
@@ -663,8 +663,8 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE =
1;
-
- public static final String DEFAULT_RM_SCHEDULER =
+
+ public static final String DEFAULT_RM_SCHEDULER =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
/** RM set next Heartbeat interval for NM */
@@ -696,6 +696,14 @@ public static boolean isAclEnabled(Configuration conf) {
public static final float
DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f;
+ /**
+ * Number of consecutive missed heartbeats after which node will be
+ * skipped from scheduling.
+ */
+ public static final String SCHEDULER_SKIP_NODE_MULTIPLIER =
+ YARN_PREFIX + "scheduler.skip.node.multiplier";
+ public static final int DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER = 2;
+
/** Number of worker threads that write the history data. */
public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE =
RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size";
@@ -1017,7 +1025,7 @@ public static boolean isAclEnabled(Configuration conf) {
////////////////////////////////
/** The class to use as the persistent store.*/
public static final String RM_STORE = RM_PREFIX + "store.class";
-
+
/** URI for FileSystemRMStateStore */
public static final String FS_RM_STATE_STORE_URI = RM_PREFIX
+ "fs.state-store.uri";
@@ -1075,7 +1083,7 @@ public static boolean isAclEnabled(Configuration conf) {
/** Default application type length */
public static final int APPLICATION_TYPE_LENGTH = 20;
-
+
/** Default queue name */
public static final String DEFAULT_QUEUE_NAME = "default";
@@ -1088,7 +1096,7 @@ public static boolean isAclEnabled(Configuration conf) {
/**
* Default sizes of the runtime metric buckets in minutes.
*/
- public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS =
+ public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS =
"60,300,1440";
public static final String RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX
@@ -1105,7 +1113,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
RM_PREFIX + "nm-tokens.master-key-rolling-interval-secs";
-
+
public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
24 * 60 * 60;
@@ -1184,7 +1192,7 @@ public static boolean isAclEnabled(Configuration conf) {
////////////////////////////////
// Node Manager Configs
////////////////////////////////
-
+
/** Prefix for all node manager configs.*/
public static final String NM_PREFIX = "yarn.nodemanager.";
@@ -1221,13 +1229,13 @@ public static boolean isAclEnabled(Configuration conf) {
ApplicationConstants.Environment.HADOOP_CONF_DIR.key(),
ApplicationConstants.Environment.CLASSPATH_PREPEND_DISTCACHE.key(),
ApplicationConstants.Environment.HADOOP_YARN_HOME.key()));
-
+
/** address of node manager IPC.*/
public static final String NM_ADDRESS = NM_PREFIX + "address";
public static final int DEFAULT_NM_PORT = 0;
public static final String DEFAULT_NM_ADDRESS = "0.0.0.0:"
+ DEFAULT_NM_PORT;
-
+
/** The actual bind address for the NM.*/
public static final String NM_BIND_HOST =
NM_PREFIX + "bind-host";
@@ -1240,42 +1248,42 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String NM_CONTAINER_STATE_TRANSITION_LISTENERS =
NM_PREFIX + "container-state-transition-listener.classes";
- /**
+ /**
* Adjustment to make to the container os scheduling priority.
* The valid values for this could vary depending on the platform.
- * On Linux, higher values mean run the containers at a less
- * favorable priority than the NM.
+ * On Linux, higher values mean run the containers at a less
+ * favorable priority than the NM.
* The value specified is an int.
*/
- public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY =
+ public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY =
NM_PREFIX + "container-executor.os.sched.priority.adjustment";
public static final int DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 0;
-
+
/** Number of threads container manager uses.*/
public static final String NM_CONTAINER_MGR_THREAD_COUNT =
NM_PREFIX + "container-manager.thread-count";
public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;
-
+
/** Number of threads container manager uses.*/
public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT =
NM_PREFIX + "collector-service.thread-count";
public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5;
/** Number of threads used in cleanup.*/
- public static final String NM_DELETE_THREAD_COUNT =
+ public static final String NM_DELETE_THREAD_COUNT =
NM_PREFIX + "delete.thread-count";
public static final int DEFAULT_NM_DELETE_THREAD_COUNT = 4;
-
+
/** Keytab for NM.*/
public static final String NM_KEYTAB = NM_PREFIX + "keytab";
-
+
/**List of directories to store localized files in.*/
public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs";
public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir";
/**
* Number of files in each localized directories
- * Avoid tuning this too low.
+ * Avoid tuning this too low.
*/
public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY =
NM_PREFIX + "local-cache.max-files-per-directory";
@@ -1287,7 +1295,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_NM_LOCALIZER_PORT = 8040;
public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" +
DEFAULT_NM_LOCALIZER_PORT;
-
+
/** Address where the collector service IPC is.*/
public static final String NM_COLLECTOR_SERVICE_ADDRESS =
NM_PREFIX + "collector-service.address";
@@ -1308,9 +1316,9 @@ public static boolean isAclEnabled(Configuration conf) {
/** Interval in between cache cleanups.*/
public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
NM_PREFIX + "localizer.cache.cleanup.interval-ms";
- public static final long DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
+ public static final long DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
10 * 60 * 1000;
-
+
/**
* Target size of localizer cache in MB, per nodemanager. It is a target
* retention size that only includes resources with PUBLIC and PRIVATE
@@ -1319,14 +1327,14 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String NM_LOCALIZER_CACHE_TARGET_SIZE_MB =
NM_PREFIX + "localizer.cache.target-size-mb";
public static final long DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB = 10 * 1024;
-
+
/** Number of threads to handle localization requests.*/
public static final String NM_LOCALIZER_CLIENT_THREAD_COUNT =
NM_PREFIX + "localizer.client.thread-count";
public static final int DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT = 5;
-
+
/** Number of threads to use for localization fetching.*/
- public static final String NM_LOCALIZER_FETCH_THREAD_COUNT =
+ public static final String NM_LOCALIZER_FETCH_THREAD_COUNT =
NM_PREFIX + "localizer.fetch.thread-count";
public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4;
@@ -1381,7 +1389,7 @@ public static boolean isAclEnabled(Configuration conf) {
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
-
+
/** Delegation Token renewer thread count */
public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
RM_PREFIX + "delegation-token-renewer.thread-count";
@@ -1432,7 +1440,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT
= YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir-suffix";
- /**
+ /**
* How long to wait before deleting aggregated logs, -1 disables.
* Be careful set this too small and you will spam the name node.
*/
@@ -1444,7 +1452,7 @@ public static boolean isAclEnabled(Configuration conf) {
+ "log-aggregation.debug.filesize";
public static final long DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE
= 100 * 1024 * 1024;
-
+
/**
* How long to wait between aggregated log retention checks. If set to
* a value {@literal <=} 0 then the value is computed as one-tenth of the
@@ -1506,12 +1514,12 @@ public static boolean isAclEnabled(Configuration conf) {
* Number of threads used in log cleanup. Only applicable if Log aggregation
* is disabled
*/
- public static final String NM_LOG_DELETION_THREADS_COUNT =
+ public static final String NM_LOG_DELETION_THREADS_COUNT =
NM_PREFIX + "log.deletion-threads-count";
public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4;
/** Where to aggregate logs to.*/
- public static final String NM_REMOTE_APP_LOG_DIR =
+ public static final String NM_REMOTE_APP_LOG_DIR =
NM_PREFIX + "remote-app-log-dir";
public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs";
@@ -1519,7 +1527,7 @@ public static boolean isAclEnabled(Configuration conf) {
* The remote log dir will be created at
* NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId}
*/
- public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX =
+ public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX =
NM_PREFIX + "remote-app-log-dir-suffix";
public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs";
@@ -1529,7 +1537,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String YARN_LOG_SERVER_WEBSERVICE_URL =
YARN_PREFIX + "log.server.web-service.url";
- public static final String YARN_TRACKING_URL_GENERATOR =
+ public static final String YARN_TRACKING_URL_GENERATOR =
YARN_PREFIX + "tracking.url.generator";
/** Amount of memory in MB that can be allocated for containers.*/
@@ -1829,7 +1837,7 @@ public static boolean isAclEnabled(Configuration conf) {
+ "webapp.https.address";
public static final int DEFAULT_NM_WEBAPP_HTTPS_PORT = 8044;
public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:"
- + DEFAULT_NM_WEBAPP_HTTPS_PORT;
+ + DEFAULT_NM_WEBAPP_HTTPS_PORT;
/** Enable/disable CORS filter. */
public static final String NM_WEBAPP_ENABLE_CORS_FILTER =
@@ -2010,14 +2018,14 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB = 0;
/** Frequency of running node health script.*/
- public static final String NM_HEALTH_CHECK_INTERVAL_MS =
+ public static final String NM_HEALTH_CHECK_INTERVAL_MS =
NM_PREFIX + "health-checker.interval-ms";
public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000;
- /** Health check script time out period.*/
- public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
+ /** Health check script time out period.*/
+ public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
NM_PREFIX + "health-checker.script.timeout-ms";
- public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
+ public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS;
/** Whether or not to run the node health script before the NM
@@ -2026,13 +2034,13 @@ public static boolean isAclEnabled(Configuration conf) {
NM_PREFIX + "health-checker.run-before-startup";
public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP =
false;
-
+
/** The health check script to run.*/
- public static final String NM_HEALTH_CHECK_SCRIPT_PATH =
+ public static final String NM_HEALTH_CHECK_SCRIPT_PATH =
NM_PREFIX + "health-checker.script.path";
-
+
/** The arguments to pass to the health check script.*/
- public static final String NM_HEALTH_CHECK_SCRIPT_OPTS =
+ public static final String NM_HEALTH_CHECK_SCRIPT_OPTS =
NM_PREFIX + "health-checker.script.opts";
/** The JVM options used on forking ContainerLocalizer process
@@ -2268,30 +2276,30 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_NM_NONSECURE_MODE_LOCAL_USER = "nobody";
/**
- * The allowed pattern for UNIX user names enforced by
- * Linux-container-executor when used in nonsecure mode (use case for this
+ * The allowed pattern for UNIX user names enforced by
+ * Linux-container-executor when used in nonsecure mode (use case for this
* is using cgroups). The default value is taken from /usr/sbin/adduser
*/
public static final String NM_NONSECURE_MODE_USER_PATTERN_KEY = NM_PREFIX +
"linux-container-executor.nonsecure-mode.user-pattern";
- public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN =
+ public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN =
"^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$";
/** The type of resource enforcement to use with the
* linux container executor.
*/
- public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER =
+ public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER =
NM_PREFIX + "linux-container-executor.resources-handler.class";
-
+
/** The path the linux container executor should use for cgroups */
public static final String NM_LINUX_CONTAINER_CGROUPS_HIERARCHY =
NM_PREFIX + "linux-container-executor.cgroups.hierarchy";
-
+
/** Whether the linux container executor should mount cgroups if not found */
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT =
NM_PREFIX + "linux-container-executor.cgroups.mount";
-
+
/** Where the linux container executor should mount cgroups if not found */
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH =
NM_PREFIX + "linux-container-executor.cgroups.mount-path";
@@ -2315,7 +2323,7 @@ public static boolean isAclEnabled(Configuration conf) {
/**
* Interval of time the linux container executor should try cleaning up
- * cgroups entry when cleaning up a container. This is required due to what
+ * cgroups entry when cleaning up a container. This is required due to what
* it seems a race condition because the SIGTERM/SIGKILL is asynch.
*/
public static final String NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT =
@@ -2345,24 +2353,24 @@ public static boolean isAclEnabled(Configuration conf) {
NM_PREFIX + "windows-container.cpu-limit.enabled";
public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = false;
- /**
+ /**
/* The Windows group that the windows-secure-container-executor should run as.
*/
public static final String NM_WINDOWS_SECURE_CONTAINER_GROUP =
NM_PREFIX + "windows-secure-container-executor.group";
/** T-file compression types used to compress aggregated logs.*/
- public static final String NM_LOG_AGG_COMPRESSION_TYPE =
+ public static final String NM_LOG_AGG_COMPRESSION_TYPE =
NM_PREFIX + "log-aggregation.compression-type";
public static final String DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE = "none";
-
+
/** The kerberos principal for the node manager.*/
public static final String NM_PRINCIPAL =
NM_PREFIX + "principal";
-
- public static final String NM_AUX_SERVICES =
+
+ public static final String NM_AUX_SERVICES =
NM_PREFIX + "aux-services";
-
+
public static final String NM_AUX_SERVICE_FMT =
NM_PREFIX + "aux-services.%s.class";
@@ -2392,11 +2400,11 @@ public static boolean isAclEnabled(Configuration conf) {
/**The kerberos principal to be used for spnego filter for NM.*/
public static final String NM_WEBAPP_SPNEGO_USER_NAME_KEY =
NM_PREFIX + "webapp.spnego-principal";
-
+
/**The kerberos keytab to be used for spnego filter for NM.*/
public static final String NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY =
NM_PREFIX + "webapp.spnego-keytab-file";
-
+
public static final String DEFAULT_NM_USER_HOME_DIR= "/home/";
public static final String NM_RECOVERY_PREFIX = NM_PREFIX + "recovery.";
@@ -2427,44 +2435,44 @@ public static boolean isAclEnabled(Configuration conf) {
// Web Proxy Configs
////////////////////////////////
public static final String PROXY_PREFIX = "yarn.web-proxy.";
-
+
/** The kerberos principal for the proxy.*/
public static final String PROXY_PRINCIPAL =
PROXY_PREFIX + "principal";
-
+
/** Keytab for Proxy.*/
public static final String PROXY_KEYTAB = PROXY_PREFIX + "keytab";
-
+
/** The address for the web proxy.*/
public static final String PROXY_ADDRESS =
PROXY_PREFIX + "address";
public static final int DEFAULT_PROXY_PORT = 9099;
public static final String DEFAULT_PROXY_ADDRESS =
"0.0.0.0:" + DEFAULT_PROXY_PORT;
-
+
/**
* YARN Service Level Authorization
*/
- public static final String
+ public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL =
"security.resourcetracker.protocol.acl";
- public static final String
+ public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONCLIENT_PROTOCOL =
"security.applicationclient.protocol.acl";
- public static final String
+ public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCEMANAGER_ADMINISTRATION_PROTOCOL =
"security.resourcemanager-administration.protocol.acl";
- public static final String
+ public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONMASTER_PROTOCOL =
"security.applicationmaster.protocol.acl";
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_DISTRIBUTEDSCHEDULING_PROTOCOL =
"security.distributedscheduling.protocol.acl";
- public static final String
+ public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL =
"security.containermanagement.protocol.acl";
- public static final String
+ public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
"security.resourcelocalizer.protocol.acl";
@@ -3069,7 +3077,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT =
TIMELINE_SERVICE_PREFIX + "handler-thread-count";
public static final int DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT = 10;
-
+
/** The address of the timeline service web application.*/
public static final String TIMELINE_SERVICE_WEBAPP_ADDRESS =
@@ -3290,7 +3298,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String SHARED_CACHE_NESTED_LEVEL =
SHARED_CACHE_PREFIX + "nested-level";
public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
-
+
// Shared Cache Manager Configs
public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store.";
@@ -3324,7 +3332,7 @@ public static boolean isAclEnabled(Configuration conf) {
"0.0.0.0:" + DEFAULT_SCM_WEBAPP_PORT;
// In-memory SCM store configuration
-
+
public static final String IN_MEMORY_STORE_PREFIX =
SCM_STORE_PREFIX + "in-memory.";
@@ -3345,7 +3353,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String IN_MEMORY_INITIAL_DELAY_MINS =
IN_MEMORY_STORE_PREFIX + "initial-delay-mins";
public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10;
-
+
/**
* The frequency at which the in-memory store checks to remove dead initial
* applications. Specified in minutes.
@@ -3731,13 +3739,13 @@ public static boolean isAclEnabled(Configuration conf) {
* Node-labels configurations
*/
public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
-
+
/** Node label store implementation class */
public static final String FS_NODE_LABELS_STORE_IMPL_CLASS = NODE_LABELS_PREFIX
+ "fs-store.impl.class";
public static final String DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS =
"org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore";
-
+
/** URI for NodeLabelManager */
public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX
+ "fs-store.root-dir";
@@ -3765,10 +3773,10 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX
+ "enabled";
public static final boolean DEFAULT_NODE_LABELS_ENABLED = false;
-
+
public static final String NODELABEL_CONFIGURATION_TYPE =
NODE_LABELS_PREFIX + "configuration-type";
-
+
public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE =
"centralized";
@@ -3777,7 +3785,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
"distributed";
-
+
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTRALIZED_NODELABEL_CONFIGURATION_TYPE;
@@ -4102,7 +4110,7 @@ public static boolean areNodeLabelsEnabled(
public YarnConfiguration() {
super();
}
-
+
public YarnConfiguration(Configuration conf) {
super(conf);
if (! (conf instanceof YarnConfiguration)) {
@@ -4337,6 +4345,20 @@ public static boolean numaAwarenessEnabled(Configuration conf) {
DEFAULT_NM_NUMA_AWARENESS_ENABLED);
}
+ /**
+ * Returns Timeout to skip node from scheduling if not heartbeated.
+ * @param conf the configuration
+ * @return timeout in milliseconds.
+ */
+ public static long getSkipNodeInterval(Configuration conf) {
+ long heartbeatIntvl = conf.getLong(
+ YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
+ int multiplier = conf.getInt(SCHEDULER_SKIP_NODE_MULTIPLIER,
+ DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER);
+ return multiplier * heartbeatIntvl;
+ }
+
/* For debugging. mp configurations to system output as XML format. */
public static void main(String[] args) throws Exception {
new YarnConfiguration(new Configuration()).writeXml(System.out);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 3f35d20ddd3e8..2ebed85564a37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -46,15 +46,15 @@
yarn.ipc.rpc.class
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
-
+
The hostname of the RM.
yarn.resourcemanager.hostname
0.0.0.0
-
-
+
+
The address of the applications manager interface in the RM.
yarn.resourcemanager.address
@@ -919,6 +919,13 @@
1.0
+
+ The Number of consecutive missed heartbeats after which node will be
+ skipped from scheduling
+ yarn.scheduler.skip.node.multiplier
+ 2
+
+
The minimum allowed version of a connecting nodemanager. The valid values are
NONE (no version checking), EqualToRM (the nodemanager's version is equal to
@@ -1140,7 +1147,7 @@
yarn.nodemanager.hostname
0.0.0.0
-
+
The address of the container manager in the NM.
yarn.nodemanager.address
@@ -1229,13 +1236,13 @@
- Number of seconds after an application finishes before the nodemanager's
+ Number of seconds after an application finishes before the nodemanager's
DeletionService will delete the application's localized file directory
and log directory.
-
+
To diagnose YARN application problems, set this property's value large
enough (for example, to 600 = 10 minutes) to permit examination of these
- directories. After changing the property's value, you must restart the
+ directories. After changing the property's value, you must restart the
nodemanager in order for it to have an effect.
The roots of YARN applications' work directories is configurable with
@@ -1254,7 +1261,7 @@
- List of directories to store localized files in. An
+ List of directories to store localized files in. An
application's localized file directory will be found in:
${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}.
Individual containers' work directories, called container_${contid}, will
@@ -1312,7 +1319,7 @@
Target size of localizer cache in MB, per nodemanager. It is
- a target retention size that only includes resources with PUBLIC and
+ a target retention size that only includes resources with PUBLIC and
PRIVATE visibility and excludes resources with APPLICATION visibility
yarn.nodemanager.localizer.cache.target-size-mb
@@ -1350,7 +1357,7 @@
Where to store container logs. An application's localized log directory
will be found in ${yarn.nodemanager.log-dirs}/application_${appid}.
- Individual containers' log directories will be below this, in directories
+ Individual containers' log directories will be below this, in directories
named container_{$contid}. Each container directory will contain the files
stderr, stdin, and syslog generated by that container.
@@ -1382,12 +1389,12 @@
- How long to keep aggregation logs before deleting them. -1 disables.
+ How long to keep aggregation logs before deleting them. -1 disables.
Be careful set this too small and you will spam the name node.
yarn.log-aggregation.retain-seconds
-1
-
-
+
+
How long to wait between aggregated log retention checks.
If set to 0 or a negative value then the value is computed as one-tenth
@@ -1451,7 +1458,7 @@
/tmp/logs
- The remote log dir will be created at
+ The remote log dir will be created at
{yarn.nodemanager.remote-app-log-dir}/${user}/{thisParam}
yarn.nodemanager.remote-app-log-dir-suffix
@@ -1471,7 +1478,7 @@
- Amount of physical memory, in MB, that can be allocated
+ Amount of physical memory, in MB, that can be allocated
for containers. If set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically calculated(in case of Windows and Linux).
@@ -1762,9 +1769,9 @@
- The maximum percentage of disk space utilization allowed after
- which a disk is marked as bad. Values can range from 0.0 to 100.0.
- If the value is greater than or equal to 100, the nodemanager will check
+ The maximum percentage of disk space utilization allowed after
+ which a disk is marked as bad. Values can range from 0.0 to 100.0.
+ If the value is greater than or equal to 100, the nodemanager will check
for full disk. This applies to yarn.nodemanager.local-dirs and
yarn.nodemanager.log-dirs when
yarn.nodemanager.disk-health-checker.disk-utilization-threshold.enabled is true.
@@ -2120,8 +2127,8 @@
- The minimum allowed version of a resourcemanager that a nodemanager will connect to.
- The valid values are NONE (no version checking), EqualToNM (the resourcemanager's version is
+ The minimum allowed version of a resourcemanager that a nodemanager will connect to.
+ The valid values are NONE (no version checking), EqualToNM (the resourcemanager's version is
equal to or greater than the NM version), or a Version String.
yarn.nodemanager.resourcemanager.minimum.version
NONE
@@ -2202,7 +2209,7 @@
yarn.client.max-cached-nodemanagers-proxies
0
-
+
Enable the node manager to recover after starting
yarn.nodemanager.recovery.enabled
@@ -2314,13 +2321,13 @@
yarn.web-proxy.principal
-
+
- Keytab for WebAppProxy, if the proxy is not running as part of
+ Keytab for WebAppProxy, if the proxy is not running as part of
the RM.
yarn.web-proxy.keytab
-
+
The address for the web proxy as HOST:PORT, if this is not
given then the proxy will run as part of the RM
@@ -2334,7 +2341,7 @@
CLASSPATH for YARN applications. A comma-separated list
of CLASSPATH entries. When this value is empty, the following default
- CLASSPATH for YARN applications would be used.
+ CLASSPATH for YARN applications would be used.
For Linux:
$HADOOP_CONF_DIR,
$HADOOP_COMMON_HOME/share/hadoop/common/*,
@@ -2849,7 +2856,7 @@
yarn.sharedcache.app-checker.class
org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker
-
+
A resource in the in-memory store is considered stale
if the time since the last reference exceeds the staleness period.
@@ -2857,21 +2864,21 @@
yarn.sharedcache.store.in-memory.staleness-period-mins
10080
-
+
Initial delay before the in-memory store runs its first check
to remove dead initial applications. Specified in minutes.
yarn.sharedcache.store.in-memory.initial-delay-mins
10
-
+
The frequency at which the in-memory store checks to remove
dead initial applications. Specified in minutes.
yarn.sharedcache.store.in-memory.check-period-mins
720
-
+
The address of the admin interface in the SCM (shared cache manager)
yarn.sharedcache.admin.address
@@ -3302,7 +3309,7 @@
Private_Dirty, Private_Clean, Shared_Dirty, Shared_Clean which can be used
for computing more accurate RSS. When this flag is enabled, RSS is computed
as Min(Shared_Dirty, Pss) + Private_Clean + Private_Dirty. It excludes
- read-only shared mappings in RSS computation.
+ read-only shared mappings in RSS computation.
yarn.nodemanager.container-monitor.procfs-tree.smaps-based-rss.enabled
false
@@ -3752,7 +3759,7 @@
yarn.timeline-service.http-cross-origin.enabled
false
-
+
Flag to enable cross-origin (CORS) support for timeline service v1.x or
@@ -3870,7 +3877,7 @@
to specify details about the individual resource types.
-
+
yarn.webapp.filter-entity-list-by-user
false
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index dd3e0bc26fd95..24bbaf8e6776a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -159,6 +159,7 @@ public abstract class AbstractYarnScheduler
protected ConcurrentMap> applications;
protected int nmExpireInterval;
protected long nmHeartbeatInterval;
+ private long skipNodeInterval;
private final static List EMPTY_CONTAINER_LIST =
new ArrayList();
@@ -361,6 +362,10 @@ public long getLastNodeUpdateTime() {
return lastNodeUpdateTime;
}
+ public long getSkipNodeInterval(){
+ return skipNodeInterval;
+ }
+
protected void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index c3d2c431a8b3b..af5a4a17b78d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.util.Time;
import static org.apache.hadoop.yarn.exceptions
.InvalidResourceRequestException
@@ -72,7 +73,7 @@
.InvalidResourceRequestException.UNKNOWN_REASON_MESSAGE_TEMPLATE;
/**
- * Utilities shared by schedulers.
+ * Utilities shared by schedulers.
*/
@Private
@Unstable
@@ -136,7 +137,7 @@ public String toString() {
*
* @param containerId {@link ContainerId} of returned/released/lost container.
* @param diagnostics diagnostic message
- * @return ContainerStatus for an returned/released/lost
+ * @return ContainerStatus for an returned/released/lost
* container
*/
public static ContainerStatus createAbnormalContainerStatus(
@@ -179,7 +180,7 @@ public static ContainerStatus createPreemptedContainerStatus(
*
* @param containerId {@link ContainerId} of returned/released/lost container.
* @param diagnostics diagnostic message
- * @return ContainerStatus for an returned/released/lost
+ * @return ContainerStatus for an returned/released/lost
* container
*/
private static ContainerStatus createAbnormalContainerStatus(
@@ -604,4 +605,11 @@ public static RMContainer createOpportunisticRmContainer(RMContext rmContext,
node.allocateContainer(rmContainer);
return rmContainer;
}
+
+ public static boolean isNodeHeartbeated(SchedulerNode node,
+ long skipNodeInterval) {
+ long timeElapsedFromLastHeartbeat =
+ Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
+ return timeElapsedFromLastHeartbeat <= skipNodeInterval;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index caf2c8bc220f0..17897d1ecf1ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -67,6 +67,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -233,7 +234,7 @@ public Configuration getConf() {
private AppPriorityACLsManager appPriorityACLManager;
private boolean multiNodePlacementEnabled;
- private static boolean printedVerboseLoggingForAsyncScheduling = false;
+ private boolean printedVerboseLoggingForAsyncScheduling;
/**
* EXPERT
@@ -513,22 +514,47 @@ long getAsyncScheduleInterval() {
private final static Random random = new Random(System.currentTimeMillis());
- private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
+ @VisibleForTesting
+ public static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
CapacityScheduler cs, boolean printVerboseLog) {
- // Skip node which missed 2 heartbeats since the node might be dead and
- // we should not continue allocate containers on that.
- long timeElapsedFromLastHeartbeat =
- Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
- if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) {
+ // Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER
+ // heartbeats since the node might be dead and we should not continue
+ // allocate containers on that.
+ if (!SchedulerUtils.isNodeHeartbeated(node, cs.getSkipNodeInterval())) {
if (printVerboseLog && LOG.isDebugEnabled()) {
- LOG.debug("Skip scheduling on node because it haven't heartbeated for "
+ long timeElapsedFromLastHeartbeat =
+ Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
+ LOG.debug("Skip scheduling on node " + node.getNodeID()
+ + " because it haven't heartbeated for "
+ timeElapsedFromLastHeartbeat / 1000.0f + " secs");
}
return true;
}
+
+ if (node.getRMNode().getState() != NodeState.RUNNING) {
+ if (printVerboseLog && LOG.isDebugEnabled()) {
+ LOG.debug("Skip scheduling on node because it is in " +
+ node.getRMNode().getState() + " state");
+ }
+ return true;
+ }
return false;
}
+ private static boolean isPrintSkippedNodeLogging(CapacityScheduler cs) {
+ // To avoid too verbose DEBUG logging, only print debug log once for
+ // every 10 secs.
+ boolean printSkipedNodeLogging = false;
+ if (LOG.isDebugEnabled()) {
+ if (Time.monotonicNow() / 1000 % 10 == 0) {
+ printSkipedNodeLogging = (!cs.printedVerboseLoggingForAsyncScheduling);
+ } else {
+ cs.printedVerboseLoggingForAsyncScheduling = false;
+ }
+ }
+ return printSkipedNodeLogging;
+ }
+
/**
* Schedule on all nodes by starting at a random point.
* @param cs
@@ -548,17 +574,12 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
// To avoid too verbose DEBUG logging, only print debug log once for
// every 10 secs.
- boolean printSkipedNodeLogging = false;
- if (Time.monotonicNow() / 1000 % 10 == 0) {
- printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
- } else {
- printedVerboseLoggingForAsyncScheduling = false;
- }
+ boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(cs);
// Allocate containers of node [start, end)
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
- if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+ if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
@@ -572,14 +593,14 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
if (current++ > start) {
break;
}
- if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+ if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
- if (printSkipedNodeLogging) {
- printedVerboseLoggingForAsyncScheduling = true;
+ if (printSkippedNodeLogging) {
+ cs.printedVerboseLoggingForAsyncScheduling = true;
}
Thread.sleep(cs.getAsyncScheduleInterval());
@@ -1456,16 +1477,48 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount,
|| assignedContainers < maxAssignPerHeartbeat);
}
+ private Map getNodesHeartbeated(String partition) {
+ Map nodesByPartition = new HashMap<>();
+ boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(this);
+ List nodes = nodeTracker
+ .getNodesPerPartition(partition);
+
+ if (nodes != null && !nodes.isEmpty()) {
+ //Filter for node heartbeat too long
+ nodes.stream()
+ .filter(node ->
+ !shouldSkipNodeSchedule(node, this, printSkippedNodeLogging))
+ .forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+ }
+
+ if (printSkippedNodeLogging) {
+ printedVerboseLoggingForAsyncScheduling = true;
+ }
+ return nodesByPartition;
+ }
+
+ private CandidateNodeSet getCandidateNodeSet(
+ String partition) {
+ CandidateNodeSet candidates = null;
+ Map nodesByPartition
+ = getNodesHeartbeated(partition);
+
+ if (!nodesByPartition.isEmpty()) {
+ candidates = new SimpleCandidateNodeSet(
+ nodesByPartition, partition);
+ }
+
+ return candidates;
+ }
+
private CandidateNodeSet getCandidateNodeSet(
FiCaSchedulerNode node) {
CandidateNodeSet candidates = null;
candidates = new SimpleCandidateNodeSet<>(node);
if (multiNodePlacementEnabled) {
- Map nodesByPartition = new HashMap<>();
- List nodes = nodeTracker
- .getNodesPerPartition(node.getPartition());
- if (nodes != null && !nodes.isEmpty()) {
- nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+ Map nodesByPartition =
+ getNodesHeartbeated(node.getPartition());
+ if (!nodesByPartition.isEmpty()) {
candidates = new SimpleCandidateNodeSet(
nodesByPartition, node.getPartition());
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
index a757ea527afff..3e3a73fd5c662 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
@@ -135,7 +135,7 @@ public void reSortClusterNodes() {
Map nodesByPartition = new HashMap<>();
List nodes = ((AbstractYarnScheduler) rmContext
.getScheduler()).getNodeTracker().getNodesPerPartition(label);
- if (nodes != null && !nodes.isEmpty()) {
+ if (nodes != null) {
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
multiNodePolicy.addAndRefreshNodesSet(
(Collection) nodesByPartition.values(), label);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
index c8a7e66f5fe03..8c5691f189f67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
@@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,8 +31,10 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
/**
* Node Sorting Manager which runs all sorter threads and policies.
@@ -48,6 +51,7 @@ public class MultiNodeSortingManager
private Set policySpecs = new HashSet();
private Configuration conf;
private boolean multiNodePlacementEnabled;
+ private long skipNodeInterval;
public MultiNodeSortingManager() {
super("MultiNodeSortingManager");
@@ -59,6 +63,7 @@ public void serviceInit(Configuration configuration) throws Exception {
LOG.info("Initializing NodeSortingService=" + getName());
super.serviceInit(configuration);
this.conf = configuration;
+ this.skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);
}
@Override
@@ -134,6 +139,42 @@ public Iterator getMultiNodeSortIterator(Collection nodes,
policy.addAndRefreshNodesSet(nodes, partition);
}
- return policy.getPreferredNodeIterator(nodes, partition);
+ Iterator nodesIterator = policy.getPreferredNodeIterator(nodes,
+ partition);
+
+ // Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER
+ // heartbeats since the node might be dead and we should not continue
+ // allocate containers on that.
+ Iterator filteringIterator = new Iterator() {
+ private N cached;
+ private boolean hasCached;
+ @Override
+ public boolean hasNext() {
+ if (hasCached) {
+ return true;
+ }
+ while (nodesIterator.hasNext()) {
+ cached = nodesIterator.next();
+ if (SchedulerUtils.isNodeHeartbeated(cached, skipNodeInterval)) {
+ hasCached = true;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public N next() {
+ if (hasCached) {
+ hasCached = false;
+ return cached;
+ }
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return next();
+ }
+ };
+ return filteringIterator;
}
}