From 5210001f5d584efb2d994886e2703da0030d76bc Mon Sep 17 00:00:00 2001 From: Prabhjyot Singh Date: Thu, 14 Nov 2024 17:51:56 -0500 Subject: [PATCH] YARN-10352 Skip schedule on not heartbeated nodes in Multi Node Placement. Contributed by Prabhu Joseph and Qi Zhu (cherry picked from commit bc815b3ddff2fa4499e5a8b2ffe9ea0d3e8e712d) --- .../hadoop/yarn/conf/YarnConfiguration.java | 244 ++++++++++-------- .../src/main/resources/yarn-default.xml | 71 ++--- .../scheduler/AbstractYarnScheduler.java | 6 + .../scheduler/SchedulerUtils.java | 14 +- .../scheduler/capacity/CapacityScheduler.java | 99 +++++-- .../scheduler/placement/MultiNodeSorter.java | 2 +- .../placement/MultiNodeSortingManager.java | 43 ++- 7 files changed, 308 insertions(+), 171 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 013d33ddf7fe9..fa52beaa608d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -162,27 +162,27 @@ private static void addDeprecatedKeys() { /** Factory to create client IPC classes.*/ public static final String IPC_CLIENT_FACTORY_CLASS = IPC_PREFIX + "client.factory.class"; - public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = + public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl"; /** Factory to create server IPC classes.*/ - public static final String IPC_SERVER_FACTORY_CLASS = + public static final String IPC_SERVER_FACTORY_CLASS = IPC_PREFIX + "server.factory.class"; - public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS = + public static final String DEFAULT_IPC_SERVER_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl"; /** Factory to create serializable records.*/ - public static final String IPC_RECORD_FACTORY_CLASS = + public static final String IPC_RECORD_FACTORY_CLASS = IPC_PREFIX + "record.factory.class"; - public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS = + public static final String DEFAULT_IPC_RECORD_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl"; /** RPC class implementation*/ public static final String IPC_RPC_IMPL = IPC_PREFIX + "rpc.class"; - public static final String DEFAULT_IPC_RPC_IMPL = + public static final String DEFAULT_IPC_RPC_IMPL = "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC"; - + //////////////////////////////// // Resource Manager Configs //////////////////////////////// @@ -201,7 +201,7 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_RM_EPOCH_RANGE = 0; /** The address of the applications manager interface in the RM.*/ - public static final String RM_ADDRESS = + public static final String RM_ADDRESS = RM_PREFIX + "address"; public static final int DEFAULT_RM_PORT = 8032; public static final String DEFAULT_RM_ADDRESS = @@ -248,9 +248,9 @@ private static void addDeprecatedKeys() { /** The Kerberos principal for the resource manager.*/ public static final String RM_PRINCIPAL = RM_PREFIX + "principal"; - + /** The address of the scheduler interface.*/ - public static final String RM_SCHEDULER_ADDRESS = + public static final String RM_SCHEDULER_ADDRESS = RM_PREFIX + "scheduler.address"; public static final int DEFAULT_RM_SCHEDULER_PORT = 8030; public static final String DEFAULT_RM_SCHEDULER_ADDRESS = "0.0.0.0:" + @@ -278,12 +278,12 @@ private static void addDeprecatedKeys() { public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50; /** If the port should be included or not in the node name. The node name - * is used by the scheduler for resource requests allocation location + * is used by the scheduler for resource requests allocation location * matching. Typically this is just the hostname, using the port is needed * when using minicluster and specific NM are required.*/ public static final String RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME = YARN_PREFIX + "scheduler.include-port-in-node-name"; - public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = + public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false; /** Configured scheduler queue placement rules. */ @@ -338,19 +338,19 @@ private static void addDeprecatedKeys() { RM_PREFIX + "scheduler.monitor.policies"; /** The address of the RM web application.*/ - public static final String RM_WEBAPP_ADDRESS = + public static final String RM_WEBAPP_ADDRESS = RM_PREFIX + "webapp.address"; public static final int DEFAULT_RM_WEBAPP_PORT = 8088; public static final String DEFAULT_RM_WEBAPP_ADDRESS = "0.0.0.0:" + DEFAULT_RM_WEBAPP_PORT; - + /** The https address of the RM web application.*/ public static final String RM_WEBAPP_HTTPS_ADDRESS = RM_PREFIX + "webapp.https.address"; public static final boolean YARN_SSL_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false; public static final String YARN_SSL_SERVER_RESOURCE_DEFAULT = "ssl-server.xml"; - + public static final int DEFAULT_RM_WEBAPP_HTTPS_PORT = 8090; public static final String DEFAULT_RM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_RM_WEBAPP_HTTPS_PORT; @@ -378,17 +378,17 @@ private static void addDeprecatedKeys() { "0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT; /** The expiry interval for application master reporting.*/ - public static final String RM_AM_EXPIRY_INTERVAL_MS = + public static final String RM_AM_EXPIRY_INTERVAL_MS = YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms"; public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000; /** How long to wait until a node manager is considered dead.*/ - public static final String RM_NM_EXPIRY_INTERVAL_MS = + public static final String RM_NM_EXPIRY_INTERVAL_MS = YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms"; public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000; /** Are acls enabled.*/ - public static final String YARN_ACL_ENABLE = + public static final String YARN_ACL_ENABLE = YARN_PREFIX + "acl.enable"; public static final boolean DEFAULT_YARN_ACL_ENABLE = false; @@ -402,10 +402,10 @@ public static boolean isAclEnabled(Configuration conf) { } /** ACL of who can be admin of YARN cluster.*/ - public static final String YARN_ADMIN_ACL = + public static final String YARN_ADMIN_ACL = YARN_PREFIX + "admin.acl"; public static final String DEFAULT_YARN_ADMIN_ACL = "*"; - + /** ACL used in case none is found. Allows nothing. */ public static final String DEFAULT_YARN_APP_ACL = " "; @@ -491,17 +491,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_YARN_INTERMEDIATE_DATA_ENCRYPTION = false; /** The address of the RM admin interface.*/ - public static final String RM_ADMIN_ADDRESS = + public static final String RM_ADMIN_ADDRESS = RM_PREFIX + "admin.address"; public static final int DEFAULT_RM_ADMIN_PORT = 8033; public static final String DEFAULT_RM_ADMIN_ADDRESS = "0.0.0.0:" + DEFAULT_RM_ADMIN_PORT; - + /**Number of threads used to handle RM admin interface.*/ public static final String RM_ADMIN_CLIENT_THREAD_COUNT = RM_PREFIX + "admin.client.thread-count"; public static final int DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT = 1; - + /** * The maximum number of application attempts for * an application, if unset by user. @@ -516,15 +516,15 @@ public static boolean isAclEnabled(Configuration conf) { */ public static final String GLOBAL_RM_AM_MAX_ATTEMPTS = RM_PREFIX + "am.global.max-attempts"; - + /** The keytab for the resource manager.*/ - public static final String RM_KEYTAB = + public static final String RM_KEYTAB = RM_PREFIX + "keytab"; /**The kerberos principal to be used for spnego filter for RM.*/ public static final String RM_WEBAPP_SPNEGO_USER_NAME_KEY = RM_PREFIX + "webapp.spnego-principal"; - + /**The kerberos keytab to be used for spnego filter for RM.*/ public static final String RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = RM_PREFIX + "webapp.spnego-keytab-file"; @@ -546,12 +546,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_RM_WEBAPP_ENABLE_CORS_FILTER = false; /** How long to wait until a container is considered dead.*/ - public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = + public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = RM_PREFIX + "rm.container-allocation.expiry-interval-ms"; public static final int DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 600000; - + /** Path to file with nodes to include.*/ - public static final String RM_NODES_INCLUDE_FILE_PATH = + public static final String RM_NODES_INCLUDE_FILE_PATH = RM_PREFIX + "nodes.include-path"; public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = ""; @@ -572,19 +572,19 @@ public static boolean isAclEnabled(Configuration conf) { RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms"; public static final int DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0; - + /** Path to file with nodes to exclude.*/ - public static final String RM_NODES_EXCLUDE_FILE_PATH = + public static final String RM_NODES_EXCLUDE_FILE_PATH = RM_PREFIX + "nodes.exclude-path"; public static final String DEFAULT_RM_NODES_EXCLUDE_FILE_PATH = ""; - + /** Number of threads to handle resource tracker calls.*/ public static final String RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = RM_PREFIX + "resource-tracker.client.thread-count"; public static final int DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT = 50; - + /** The class to use as the resource scheduler.*/ - public static final String RM_SCHEDULER = + public static final String RM_SCHEDULER = RM_PREFIX + "scheduler.class"; /** @@ -663,8 +663,8 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE = 1; - - public static final String DEFAULT_RM_SCHEDULER = + + public static final String DEFAULT_RM_SCHEDULER = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; /** RM set next Heartbeat interval for NM */ @@ -696,6 +696,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final float DEFAULT_RM_NM_HEARTBEAT_INTERVAL_SLOWDOWN_FACTOR = 1.0f; + /** + * Number of consecutive missed heartbeats after which node will be + * skipped from scheduling. + */ + public static final String SCHEDULER_SKIP_NODE_MULTIPLIER = + YARN_PREFIX + "scheduler.skip.node.multiplier"; + public static final int DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER = 2; + /** Number of worker threads that write the history data. */ public static final String RM_HISTORY_WRITER_MULTI_THREADED_DISPATCHER_POOL_SIZE = RM_PREFIX + "history-writer.multi-threaded-dispatcher.pool-size"; @@ -1017,7 +1025,7 @@ public static boolean isAclEnabled(Configuration conf) { //////////////////////////////// /** The class to use as the persistent store.*/ public static final String RM_STORE = RM_PREFIX + "store.class"; - + /** URI for FileSystemRMStateStore */ public static final String FS_RM_STATE_STORE_URI = RM_PREFIX + "fs.state-store.uri"; @@ -1075,7 +1083,7 @@ public static boolean isAclEnabled(Configuration conf) { /** Default application type length */ public static final int APPLICATION_TYPE_LENGTH = 20; - + /** Default queue name */ public static final String DEFAULT_QUEUE_NAME = "default"; @@ -1088,7 +1096,7 @@ public static boolean isAclEnabled(Configuration conf) { /** * Default sizes of the runtime metric buckets in minutes. */ - public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = + public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = "60,300,1440"; public static final String RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX @@ -1105,7 +1113,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX + "nm-tokens.master-key-rolling-interval-secs"; - + public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = 24 * 60 * 60; @@ -1184,7 +1192,7 @@ public static boolean isAclEnabled(Configuration conf) { //////////////////////////////// // Node Manager Configs //////////////////////////////// - + /** Prefix for all node manager configs.*/ public static final String NM_PREFIX = "yarn.nodemanager."; @@ -1221,13 +1229,13 @@ public static boolean isAclEnabled(Configuration conf) { ApplicationConstants.Environment.HADOOP_CONF_DIR.key(), ApplicationConstants.Environment.CLASSPATH_PREPEND_DISTCACHE.key(), ApplicationConstants.Environment.HADOOP_YARN_HOME.key())); - + /** address of node manager IPC.*/ public static final String NM_ADDRESS = NM_PREFIX + "address"; public static final int DEFAULT_NM_PORT = 0; public static final String DEFAULT_NM_ADDRESS = "0.0.0.0:" + DEFAULT_NM_PORT; - + /** The actual bind address for the NM.*/ public static final String NM_BIND_HOST = NM_PREFIX + "bind-host"; @@ -1240,42 +1248,42 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_CONTAINER_STATE_TRANSITION_LISTENERS = NM_PREFIX + "container-state-transition-listener.classes"; - /** + /** * Adjustment to make to the container os scheduling priority. * The valid values for this could vary depending on the platform. - * On Linux, higher values mean run the containers at a less - * favorable priority than the NM. + * On Linux, higher values mean run the containers at a less + * favorable priority than the NM. * The value specified is an int. */ - public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = + public static final String NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = NM_PREFIX + "container-executor.os.sched.priority.adjustment"; public static final int DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY = 0; - + /** Number of threads container manager uses.*/ public static final String NM_CONTAINER_MGR_THREAD_COUNT = NM_PREFIX + "container-manager.thread-count"; public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20; - + /** Number of threads container manager uses.*/ public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT = NM_PREFIX + "collector-service.thread-count"; public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5; /** Number of threads used in cleanup.*/ - public static final String NM_DELETE_THREAD_COUNT = + public static final String NM_DELETE_THREAD_COUNT = NM_PREFIX + "delete.thread-count"; public static final int DEFAULT_NM_DELETE_THREAD_COUNT = 4; - + /** Keytab for NM.*/ public static final String NM_KEYTAB = NM_PREFIX + "keytab"; - + /**List of directories to store localized files in.*/ public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs"; public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir"; /** * Number of files in each localized directories - * Avoid tuning this too low. + * Avoid tuning this too low. */ public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = NM_PREFIX + "local-cache.max-files-per-directory"; @@ -1287,7 +1295,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_NM_LOCALIZER_PORT = 8040; public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT; - + /** Address where the collector service IPC is.*/ public static final String NM_COLLECTOR_SERVICE_ADDRESS = NM_PREFIX + "collector-service.address"; @@ -1308,9 +1316,9 @@ public static boolean isAclEnabled(Configuration conf) { /** Interval in between cache cleanups.*/ public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = NM_PREFIX + "localizer.cache.cleanup.interval-ms"; - public static final long DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = + public static final long DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = 10 * 60 * 1000; - + /** * Target size of localizer cache in MB, per nodemanager. It is a target * retention size that only includes resources with PUBLIC and PRIVATE @@ -1319,14 +1327,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_LOCALIZER_CACHE_TARGET_SIZE_MB = NM_PREFIX + "localizer.cache.target-size-mb"; public static final long DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB = 10 * 1024; - + /** Number of threads to handle localization requests.*/ public static final String NM_LOCALIZER_CLIENT_THREAD_COUNT = NM_PREFIX + "localizer.client.thread-count"; public static final int DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT = 5; - + /** Number of threads to use for localization fetching.*/ - public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = + public static final String NM_LOCALIZER_FETCH_THREAD_COUNT = NM_PREFIX + "localizer.fetch.thread-count"; public static final int DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT = 4; @@ -1381,7 +1389,7 @@ public static boolean isAclEnabled(Configuration conf) { RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = 30000l; - + /** Delegation Token renewer thread count */ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = RM_PREFIX + "delegation-token-renewer.thread-count"; @@ -1421,7 +1429,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT = YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir-suffix"; - /** + /** * How long to wait before deleting aggregated logs, -1 disables. * Be careful set this too small and you will spam the name node. */ @@ -1433,7 +1441,7 @@ public static boolean isAclEnabled(Configuration conf) { + "log-aggregation.debug.filesize"; public static final long DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE = 100 * 1024 * 1024; - + /** * How long to wait between aggregated log retention checks. If set to * a value {@literal <=} 0 then the value is computed as one-tenth of the @@ -1495,12 +1503,12 @@ public static boolean isAclEnabled(Configuration conf) { * Number of threads used in log cleanup. Only applicable if Log aggregation * is disabled */ - public static final String NM_LOG_DELETION_THREADS_COUNT = + public static final String NM_LOG_DELETION_THREADS_COUNT = NM_PREFIX + "log.deletion-threads-count"; public static final int DEFAULT_NM_LOG_DELETE_THREAD_COUNT = 4; /** Where to aggregate logs to.*/ - public static final String NM_REMOTE_APP_LOG_DIR = + public static final String NM_REMOTE_APP_LOG_DIR = NM_PREFIX + "remote-app-log-dir"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs"; @@ -1508,7 +1516,7 @@ public static boolean isAclEnabled(Configuration conf) { * The remote log dir will be created at * NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId} */ - public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = + public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX = NM_PREFIX + "remote-app-log-dir-suffix"; public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs"; @@ -1518,7 +1526,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String YARN_LOG_SERVER_WEBSERVICE_URL = YARN_PREFIX + "log.server.web-service.url"; - public static final String YARN_TRACKING_URL_GENERATOR = + public static final String YARN_TRACKING_URL_GENERATOR = YARN_PREFIX + "tracking.url.generator"; /** Amount of memory in MB that can be allocated for containers.*/ @@ -1818,7 +1826,7 @@ public static boolean isAclEnabled(Configuration conf) { + "webapp.https.address"; public static final int DEFAULT_NM_WEBAPP_HTTPS_PORT = 8044; public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" - + DEFAULT_NM_WEBAPP_HTTPS_PORT; + + DEFAULT_NM_WEBAPP_HTTPS_PORT; /** Enable/disable CORS filter. */ public static final String NM_WEBAPP_ENABLE_CORS_FILTER = @@ -1999,14 +2007,14 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB = 0; /** Frequency of running node health script.*/ - public static final String NM_HEALTH_CHECK_INTERVAL_MS = + public static final String NM_HEALTH_CHECK_INTERVAL_MS = NM_PREFIX + "health-checker.interval-ms"; public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000; - /** Health check script time out period.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = + /** Health check script time out period.*/ + public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = NM_PREFIX + "health-checker.script.timeout-ms"; - public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = + public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS = 2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS; /** Whether or not to run the node health script before the NM @@ -2015,13 +2023,13 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "health-checker.run-before-startup"; public static final boolean DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP = false; - + /** The health check script to run.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_PATH = + public static final String NM_HEALTH_CHECK_SCRIPT_PATH = NM_PREFIX + "health-checker.script.path"; - + /** The arguments to pass to the health check script.*/ - public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = + public static final String NM_HEALTH_CHECK_SCRIPT_OPTS = NM_PREFIX + "health-checker.script.opts"; /** The JVM options used on forking ContainerLocalizer process @@ -2257,30 +2265,30 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_NM_NONSECURE_MODE_LOCAL_USER = "nobody"; /** - * The allowed pattern for UNIX user names enforced by - * Linux-container-executor when used in nonsecure mode (use case for this + * The allowed pattern for UNIX user names enforced by + * Linux-container-executor when used in nonsecure mode (use case for this * is using cgroups). The default value is taken from /usr/sbin/adduser */ public static final String NM_NONSECURE_MODE_USER_PATTERN_KEY = NM_PREFIX + "linux-container-executor.nonsecure-mode.user-pattern"; - public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = + public static final String DEFAULT_NM_NONSECURE_MODE_USER_PATTERN = "^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$"; /** The type of resource enforcement to use with the * linux container executor. */ - public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER = + public static final String NM_LINUX_CONTAINER_RESOURCES_HANDLER = NM_PREFIX + "linux-container-executor.resources-handler.class"; - + /** The path the linux container executor should use for cgroups */ public static final String NM_LINUX_CONTAINER_CGROUPS_HIERARCHY = NM_PREFIX + "linux-container-executor.cgroups.hierarchy"; - + /** Whether the linux container executor should mount cgroups if not found */ public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT = NM_PREFIX + "linux-container-executor.cgroups.mount"; - + /** Where the linux container executor should mount cgroups if not found */ public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH = NM_PREFIX + "linux-container-executor.cgroups.mount-path"; @@ -2304,7 +2312,7 @@ public static boolean isAclEnabled(Configuration conf) { /** * Interval of time the linux container executor should try cleaning up - * cgroups entry when cleaning up a container. This is required due to what + * cgroups entry when cleaning up a container. This is required due to what * it seems a race condition because the SIGTERM/SIGKILL is asynch. */ public static final String NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT = @@ -2334,24 +2342,24 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "windows-container.cpu-limit.enabled"; public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = false; - /** + /** /* The Windows group that the windows-secure-container-executor should run as. */ public static final String NM_WINDOWS_SECURE_CONTAINER_GROUP = NM_PREFIX + "windows-secure-container-executor.group"; /** T-file compression types used to compress aggregated logs.*/ - public static final String NM_LOG_AGG_COMPRESSION_TYPE = + public static final String NM_LOG_AGG_COMPRESSION_TYPE = NM_PREFIX + "log-aggregation.compression-type"; public static final String DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE = "none"; - + /** The kerberos principal for the node manager.*/ public static final String NM_PRINCIPAL = NM_PREFIX + "principal"; - - public static final String NM_AUX_SERVICES = + + public static final String NM_AUX_SERVICES = NM_PREFIX + "aux-services"; - + public static final String NM_AUX_SERVICE_FMT = NM_PREFIX + "aux-services.%s.class"; @@ -2381,11 +2389,11 @@ public static boolean isAclEnabled(Configuration conf) { /**The kerberos principal to be used for spnego filter for NM.*/ public static final String NM_WEBAPP_SPNEGO_USER_NAME_KEY = NM_PREFIX + "webapp.spnego-principal"; - + /**The kerberos keytab to be used for spnego filter for NM.*/ public static final String NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY = NM_PREFIX + "webapp.spnego-keytab-file"; - + public static final String DEFAULT_NM_USER_HOME_DIR= "/home/"; public static final String NM_RECOVERY_PREFIX = NM_PREFIX + "recovery."; @@ -2416,44 +2424,44 @@ public static boolean isAclEnabled(Configuration conf) { // Web Proxy Configs //////////////////////////////// public static final String PROXY_PREFIX = "yarn.web-proxy."; - + /** The kerberos principal for the proxy.*/ public static final String PROXY_PRINCIPAL = PROXY_PREFIX + "principal"; - + /** Keytab for Proxy.*/ public static final String PROXY_KEYTAB = PROXY_PREFIX + "keytab"; - + /** The address for the web proxy.*/ public static final String PROXY_ADDRESS = PROXY_PREFIX + "address"; public static final int DEFAULT_PROXY_PORT = 9099; public static final String DEFAULT_PROXY_ADDRESS = "0.0.0.0:" + DEFAULT_PROXY_PORT; - + /** * YARN Service Level Authorization */ - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL = "security.resourcetracker.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONCLIENT_PROTOCOL = "security.applicationclient.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCEMANAGER_ADMINISTRATION_PROTOCOL = "security.resourcemanager-administration.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_APPLICATIONMASTER_PROTOCOL = "security.applicationmaster.protocol.acl"; public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_DISTRIBUTEDSCHEDULING_PROTOCOL = "security.distributedscheduling.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL = "security.containermanagement.protocol.acl"; - public static final String + public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = "security.resourcelocalizer.protocol.acl"; @@ -3058,7 +3066,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT = TIMELINE_SERVICE_PREFIX + "handler-thread-count"; public static final int DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT = 10; - + /** The address of the timeline service web application.*/ public static final String TIMELINE_SERVICE_WEBAPP_ADDRESS = @@ -3279,7 +3287,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String SHARED_CACHE_NESTED_LEVEL = SHARED_CACHE_PREFIX + "nested-level"; public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3; - + // Shared Cache Manager Configs public static final String SCM_STORE_PREFIX = SHARED_CACHE_PREFIX + "store."; @@ -3313,7 +3321,7 @@ public static boolean isAclEnabled(Configuration conf) { "0.0.0.0:" + DEFAULT_SCM_WEBAPP_PORT; // In-memory SCM store configuration - + public static final String IN_MEMORY_STORE_PREFIX = SCM_STORE_PREFIX + "in-memory."; @@ -3334,7 +3342,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String IN_MEMORY_INITIAL_DELAY_MINS = IN_MEMORY_STORE_PREFIX + "initial-delay-mins"; public static final int DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS = 10; - + /** * The frequency at which the in-memory store checks to remove dead initial * applications. Specified in minutes. @@ -3720,13 +3728,13 @@ public static boolean isAclEnabled(Configuration conf) { * Node-labels configurations */ public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels."; - + /** Node label store implementation class */ public static final String FS_NODE_LABELS_STORE_IMPL_CLASS = NODE_LABELS_PREFIX + "fs-store.impl.class"; public static final String DEFAULT_FS_NODE_LABELS_STORE_IMPL_CLASS = "org.apache.hadoop.yarn.nodelabels.FileSystemNodeLabelsStore"; - + /** URI for NodeLabelManager */ public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX + "fs-store.root-dir"; @@ -3754,10 +3762,10 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX + "enabled"; public static final boolean DEFAULT_NODE_LABELS_ENABLED = false; - + public static final String NODELABEL_CONFIGURATION_TYPE = NODE_LABELS_PREFIX + "configuration-type"; - + public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE = "centralized"; @@ -3766,7 +3774,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE = "distributed"; - + public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = CENTRALIZED_NODELABEL_CONFIGURATION_TYPE; @@ -4091,7 +4099,7 @@ public static boolean areNodeLabelsEnabled( public YarnConfiguration() { super(); } - + public YarnConfiguration(Configuration conf) { super(conf); if (! (conf instanceof YarnConfiguration)) { @@ -4326,6 +4334,20 @@ public static boolean numaAwarenessEnabled(Configuration conf) { DEFAULT_NM_NUMA_AWARENESS_ENABLED); } + /** + * Returns Timeout to skip node from scheduling if not heartbeated. + * @param conf the configuration + * @return timeout in milliseconds. + */ + public static long getSkipNodeInterval(Configuration conf) { + long heartbeatIntvl = conf.getLong( + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); + int multiplier = conf.getInt(SCHEDULER_SKIP_NODE_MULTIPLIER, + DEFAULT_SCHEDULER_SKIP_NODE_MULTIPLIER); + return multiplier * heartbeatIntvl; + } + /* For debugging. mp configurations to system output as XML format. */ public static void main(String[] args) throws Exception { new YarnConfiguration(new Configuration()).writeXml(System.out); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 360945db726a5..0d4626e29b1d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -46,15 +46,15 @@ yarn.ipc.rpc.class org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC - + The hostname of the RM. yarn.resourcemanager.hostname 0.0.0.0 - - + + The address of the applications manager interface in the RM. yarn.resourcemanager.address @@ -904,6 +904,13 @@ 1.0 + + The Number of consecutive missed heartbeats after which node will be + skipped from scheduling + yarn.scheduler.skip.node.multiplier + 2 + + The minimum allowed version of a connecting nodemanager. The valid values are NONE (no version checking), EqualToRM (the nodemanager's version is equal to @@ -1125,7 +1132,7 @@ yarn.nodemanager.hostname 0.0.0.0 - + The address of the container manager in the NM. yarn.nodemanager.address @@ -1214,13 +1221,13 @@ - Number of seconds after an application finishes before the nodemanager's + Number of seconds after an application finishes before the nodemanager's DeletionService will delete the application's localized file directory and log directory. - + To diagnose YARN application problems, set this property's value large enough (for example, to 600 = 10 minutes) to permit examination of these - directories. After changing the property's value, you must restart the + directories. After changing the property's value, you must restart the nodemanager in order for it to have an effect. The roots of YARN applications' work directories is configurable with @@ -1239,7 +1246,7 @@ - List of directories to store localized files in. An + List of directories to store localized files in. An application's localized file directory will be found in: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}. Individual containers' work directories, called container_${contid}, will @@ -1297,7 +1304,7 @@ Target size of localizer cache in MB, per nodemanager. It is - a target retention size that only includes resources with PUBLIC and + a target retention size that only includes resources with PUBLIC and PRIVATE visibility and excludes resources with APPLICATION visibility yarn.nodemanager.localizer.cache.target-size-mb @@ -1335,7 +1342,7 @@ Where to store container logs. An application's localized log directory will be found in ${yarn.nodemanager.log-dirs}/application_${appid}. - Individual containers' log directories will be below this, in directories + Individual containers' log directories will be below this, in directories named container_{$contid}. Each container directory will contain the files stderr, stdin, and syslog generated by that container. @@ -1367,12 +1374,12 @@ - How long to keep aggregation logs before deleting them. -1 disables. + How long to keep aggregation logs before deleting them. -1 disables. Be careful set this too small and you will spam the name node. yarn.log-aggregation.retain-seconds -1 - - + + How long to wait between aggregated log retention checks. If set to 0 or a negative value then the value is computed as one-tenth @@ -1436,7 +1443,7 @@ /tmp/logs - The remote log dir will be created at + The remote log dir will be created at {yarn.nodemanager.remote-app-log-dir}/${user}/{thisParam} yarn.nodemanager.remote-app-log-dir-suffix @@ -1456,7 +1463,7 @@ - Amount of physical memory, in MB, that can be allocated + Amount of physical memory, in MB, that can be allocated for containers. If set to -1 and yarn.nodemanager.resource.detect-hardware-capabilities is true, it is automatically calculated(in case of Windows and Linux). @@ -1747,9 +1754,9 @@ - The maximum percentage of disk space utilization allowed after - which a disk is marked as bad. Values can range from 0.0 to 100.0. - If the value is greater than or equal to 100, the nodemanager will check + The maximum percentage of disk space utilization allowed after + which a disk is marked as bad. Values can range from 0.0 to 100.0. + If the value is greater than or equal to 100, the nodemanager will check for full disk. This applies to yarn.nodemanager.local-dirs and yarn.nodemanager.log-dirs when yarn.nodemanager.disk-health-checker.disk-utilization-threshold.enabled is true. @@ -2105,8 +2112,8 @@ - The minimum allowed version of a resourcemanager that a nodemanager will connect to. - The valid values are NONE (no version checking), EqualToNM (the resourcemanager's version is + The minimum allowed version of a resourcemanager that a nodemanager will connect to. + The valid values are NONE (no version checking), EqualToNM (the resourcemanager's version is equal to or greater than the NM version), or a Version String. yarn.nodemanager.resourcemanager.minimum.version NONE @@ -2187,7 +2194,7 @@ yarn.client.max-cached-nodemanagers-proxies 0 - + Enable the node manager to recover after starting yarn.nodemanager.recovery.enabled @@ -2299,13 +2306,13 @@ yarn.web-proxy.principal - + - Keytab for WebAppProxy, if the proxy is not running as part of + Keytab for WebAppProxy, if the proxy is not running as part of the RM. yarn.web-proxy.keytab - + The address for the web proxy as HOST:PORT, if this is not given then the proxy will run as part of the RM @@ -2319,7 +2326,7 @@ CLASSPATH for YARN applications. A comma-separated list of CLASSPATH entries. When this value is empty, the following default - CLASSPATH for YARN applications would be used. + CLASSPATH for YARN applications would be used. For Linux: $HADOOP_CONF_DIR, $HADOOP_COMMON_HOME/share/hadoop/common/*, @@ -2834,7 +2841,7 @@ yarn.sharedcache.app-checker.class org.apache.hadoop.yarn.server.sharedcachemanager.RemoteAppChecker - + A resource in the in-memory store is considered stale if the time since the last reference exceeds the staleness period. @@ -2842,21 +2849,21 @@ yarn.sharedcache.store.in-memory.staleness-period-mins 10080 - + Initial delay before the in-memory store runs its first check to remove dead initial applications. Specified in minutes. yarn.sharedcache.store.in-memory.initial-delay-mins 10 - + The frequency at which the in-memory store checks to remove dead initial applications. Specified in minutes. yarn.sharedcache.store.in-memory.check-period-mins 720 - + The address of the admin interface in the SCM (shared cache manager) yarn.sharedcache.admin.address @@ -3287,7 +3294,7 @@ Private_Dirty, Private_Clean, Shared_Dirty, Shared_Clean which can be used for computing more accurate RSS. When this flag is enabled, RSS is computed as Min(Shared_Dirty, Pss) + Private_Clean + Private_Dirty. It excludes - read-only shared mappings in RSS computation. + read-only shared mappings in RSS computation. yarn.nodemanager.container-monitor.procfs-tree.smaps-based-rss.enabled false @@ -3737,7 +3744,7 @@ yarn.timeline-service.http-cross-origin.enabled false - + Flag to enable cross-origin (CORS) support for timeline service v1.x or @@ -3855,7 +3862,7 @@ to specify details about the individual resource types. - + yarn.webapp.filter-entity-list-by-user false diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index cc3998bf3d760..514f302e03c1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -150,6 +150,7 @@ public abstract class AbstractYarnScheduler protected ConcurrentMap> applications; protected int nmExpireInterval; protected long nmHeartbeatInterval; + private long skipNodeInterval; private final static List EMPTY_CONTAINER_LIST = new ArrayList(); @@ -196,6 +197,7 @@ public void serviceInit(Configuration conf) throws Exception { nmHeartbeatInterval = conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); + skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf); long configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); @@ -348,6 +350,10 @@ public long getLastNodeUpdateTime() { return lastNodeUpdateTime; } + public long getSkipNodeInterval(){ + return skipNodeInterval; + } + protected void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index c3d2c431a8b3b..af5a4a17b78d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.util.Time; import static org.apache.hadoop.yarn.exceptions .InvalidResourceRequestException @@ -72,7 +73,7 @@ .InvalidResourceRequestException.UNKNOWN_REASON_MESSAGE_TEMPLATE; /** - * Utilities shared by schedulers. + * Utilities shared by schedulers. */ @Private @Unstable @@ -136,7 +137,7 @@ public String toString() { * * @param containerId {@link ContainerId} of returned/released/lost container. * @param diagnostics diagnostic message - * @return ContainerStatus for an returned/released/lost + * @return ContainerStatus for an returned/released/lost * container */ public static ContainerStatus createAbnormalContainerStatus( @@ -179,7 +180,7 @@ public static ContainerStatus createPreemptedContainerStatus( * * @param containerId {@link ContainerId} of returned/released/lost container. * @param diagnostics diagnostic message - * @return ContainerStatus for an returned/released/lost + * @return ContainerStatus for an returned/released/lost * container */ private static ContainerStatus createAbnormalContainerStatus( @@ -604,4 +605,11 @@ public static RMContainer createOpportunisticRmContainer(RMContext rmContext, node.allocateContainer(rmContainer); return rmContainer; } + + public static boolean isNodeHeartbeated(SchedulerNode node, + long skipNodeInterval) { + long timeElapsedFromLastHeartbeat = + Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); + return timeElapsedFromLastHeartbeat <= skipNodeInterval; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cc1a2bf265886..cc3f1b63ec4b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -233,7 +234,7 @@ public Configuration getConf() { private AppPriorityACLsManager appPriorityACLManager; private boolean multiNodePlacementEnabled; - private static boolean printedVerboseLoggingForAsyncScheduling = false; + private boolean printedVerboseLoggingForAsyncScheduling; /** * EXPERT @@ -513,22 +514,47 @@ long getAsyncScheduleInterval() { private final static Random random = new Random(System.currentTimeMillis()); - private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, + @VisibleForTesting + public static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, CapacityScheduler cs, boolean printVerboseLog) { - // Skip node which missed 2 heartbeats since the node might be dead and - // we should not continue allocate containers on that. - long timeElapsedFromLastHeartbeat = - Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); - if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) { + // Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER + // heartbeats since the node might be dead and we should not continue + // allocate containers on that. + if (!SchedulerUtils.isNodeHeartbeated(node, cs.getSkipNodeInterval())) { if (printVerboseLog && LOG.isDebugEnabled()) { - LOG.debug("Skip scheduling on node because it haven't heartbeated for " + long timeElapsedFromLastHeartbeat = + Time.monotonicNow() - node.getLastHeartbeatMonotonicTime(); + LOG.debug("Skip scheduling on node " + node.getNodeID() + + " because it haven't heartbeated for " + timeElapsedFromLastHeartbeat / 1000.0f + " secs"); } return true; } + + if (node.getRMNode().getState() != NodeState.RUNNING) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip scheduling on node because it is in " + + node.getRMNode().getState() + " state"); + } + return true; + } return false; } + private static boolean isPrintSkippedNodeLogging(CapacityScheduler cs) { + // To avoid too verbose DEBUG logging, only print debug log once for + // every 10 secs. + boolean printSkipedNodeLogging = false; + if (LOG.isDebugEnabled()) { + if (Time.monotonicNow() / 1000 % 10 == 0) { + printSkipedNodeLogging = (!cs.printedVerboseLoggingForAsyncScheduling); + } else { + cs.printedVerboseLoggingForAsyncScheduling = false; + } + } + return printSkipedNodeLogging; + } + /** * Schedule on all nodes by starting at a random point. * @param cs @@ -548,17 +574,12 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ // To avoid too verbose DEBUG logging, only print debug log once for // every 10 secs. - boolean printSkipedNodeLogging = false; - if (Time.monotonicNow() / 1000 % 10 == 0) { - printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling); - } else { - printedVerboseLoggingForAsyncScheduling = false; - } + boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(cs); // Allocate containers of node [start, end) for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { - if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) { continue; } cs.allocateContainersToNode(node.getNodeID(), false); @@ -572,14 +593,14 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ if (current++ > start) { break; } - if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) { continue; } cs.allocateContainersToNode(node.getNodeID(), false); } - if (printSkipedNodeLogging) { - printedVerboseLoggingForAsyncScheduling = true; + if (printSkippedNodeLogging) { + cs.printedVerboseLoggingForAsyncScheduling = true; } Thread.sleep(cs.getAsyncScheduleInterval()); @@ -1452,16 +1473,48 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, || assignedContainers < maxAssignPerHeartbeat); } + private Map getNodesHeartbeated(String partition) { + Map nodesByPartition = new HashMap<>(); + boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(this); + List nodes = nodeTracker + .getNodesPerPartition(partition); + + if (nodes != null && !nodes.isEmpty()) { + //Filter for node heartbeat too long + nodes.stream() + .filter(node -> + !shouldSkipNodeSchedule(node, this, printSkippedNodeLogging)) + .forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + } + + if (printSkippedNodeLogging) { + printedVerboseLoggingForAsyncScheduling = true; + } + return nodesByPartition; + } + + private CandidateNodeSet getCandidateNodeSet( + String partition) { + CandidateNodeSet candidates = null; + Map nodesByPartition + = getNodesHeartbeated(partition); + + if (!nodesByPartition.isEmpty()) { + candidates = new SimpleCandidateNodeSet( + nodesByPartition, partition); + } + + return candidates; + } + private CandidateNodeSet getCandidateNodeSet( FiCaSchedulerNode node) { CandidateNodeSet candidates = null; candidates = new SimpleCandidateNodeSet<>(node); if (multiNodePlacementEnabled) { - Map nodesByPartition = new HashMap<>(); - List nodes = nodeTracker - .getNodesPerPartition(node.getPartition()); - if (nodes != null && !nodes.isEmpty()) { - nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + Map nodesByPartition = + getNodesHeartbeated(node.getPartition()); + if (!nodesByPartition.isEmpty()) { candidates = new SimpleCandidateNodeSet( nodesByPartition, node.getPartition()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java index a757ea527afff..3e3a73fd5c662 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java @@ -135,7 +135,7 @@ public void reSortClusterNodes() { Map nodesByPartition = new HashMap<>(); List nodes = ((AbstractYarnScheduler) rmContext .getScheduler()).getNodeTracker().getNodesPerPartition(label); - if (nodes != null && !nodes.isEmpty()) { + if (nodes != null) { nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); multiNodePolicy.addAndRefreshNodesSet( (Collection) nodesByPartition.values(), label); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java index c8a7e66f5fe03..8c5691f189f67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +31,10 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; /** * Node Sorting Manager which runs all sorter threads and policies. @@ -48,6 +51,7 @@ public class MultiNodeSortingManager private Set policySpecs = new HashSet(); private Configuration conf; private boolean multiNodePlacementEnabled; + private long skipNodeInterval; public MultiNodeSortingManager() { super("MultiNodeSortingManager"); @@ -59,6 +63,7 @@ public void serviceInit(Configuration configuration) throws Exception { LOG.info("Initializing NodeSortingService=" + getName()); super.serviceInit(configuration); this.conf = configuration; + this.skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf); } @Override @@ -134,6 +139,42 @@ public Iterator getMultiNodeSortIterator(Collection nodes, policy.addAndRefreshNodesSet(nodes, partition); } - return policy.getPreferredNodeIterator(nodes, partition); + Iterator nodesIterator = policy.getPreferredNodeIterator(nodes, + partition); + + // Skip node which missed YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER + // heartbeats since the node might be dead and we should not continue + // allocate containers on that. + Iterator filteringIterator = new Iterator() { + private N cached; + private boolean hasCached; + @Override + public boolean hasNext() { + if (hasCached) { + return true; + } + while (nodesIterator.hasNext()) { + cached = nodesIterator.next(); + if (SchedulerUtils.isNodeHeartbeated(cached, skipNodeInterval)) { + hasCached = true; + return true; + } + } + return false; + } + + @Override + public N next() { + if (hasCached) { + hasCached = false; + return cached; + } + if (!hasNext()) { + throw new NoSuchElementException(); + } + return next(); + } + }; + return filteringIterator; } }