diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d42562cf6140a..0ffe87d067603 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2018,6 +2018,18 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES =
10000000000L;
+ /** Enable switch for container process number monitoring. */
+ public static final String NM_CONTAINER_PROCESS_MONITOR_ENABLED =
+ NM_PREFIX + "container-process-monitor.enable";
+ public static final boolean
+ DEFAULT_NM_CONTAINER_PROCESS_MONITOR_ENABLED= false;
+
+ /** The max process number limit for a single container. */
+ public static final String NM_CONTAINER_PROCESS_MAX_LIMIT_NUM =
+ NM_PREFIX + "container-process-monitor.max-limit-num";
+ public static final int DEFAULT_NM_CONTAINER_PROCESS_NUM_MAX_LIMIT =
+ 10000;
+
/** Enable/disable container metrics. */
@Private
public static final String NM_CONTAINER_METRICS_ENABLE =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 407ef74d3d062..3a6ef4d44fa9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1813,6 +1813,19 @@
1000000000
+
+ Flag to enable the container process monitor which enforces
+ container process number limits.
+ yarn.nodemanager.container-process-monitor.enable
+ false
+
+
+
+ The max process number limit for a single container.
+ yarn.nodemanager.container-process-monitor.max-limit-num
+ 10000
+
+
The disk space limit, in bytes, for all of a container's
logs
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
index dcb058961defd..72936d8a7ae8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
@@ -46,7 +46,8 @@ enum CGroupController {
CPUACCT("cpuacct"),
CPUSET("cpuset"),
FREEZER("freezer"),
- DEVICES("devices");
+ DEVICES("devices"),
+ PIDS("pids");
private final String name;
@@ -84,7 +85,7 @@ public static Set getValidCGroups() {
String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
String CGROUP_NO_LIMIT = "-1";
String UNDER_OOM = "under_oom 1";
-
+ String CGROUP_PIDS_MAX = "max";
String CGROUP_CPU_PERIOD_US = "cfs_period_us";
String CGROUP_CPU_QUOTA_US = "cfs_quota_us";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsPidsResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsPidsResourceHandlerImpl.java
new file mode 100644
index 0000000000000..8e2343c9203b8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsPidsResourceHandlerImpl.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An implementation for using CGroups to control the number of the process in container.
+ *
+ * The process number controller is used to allow a cgroup hierarchy to stop any
+ * new tasks from being fork()'d or clone()'d after a certain limit is reached.
+ * @see PIDS
+ */
+
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+public class CGroupsPidsResourceHandlerImpl implements PidsResourceHandler {
+
+ static final Log LOG = LogFactory.getLog(CGroupsPidsResourceHandlerImpl.class);
+
+ private CGroupsHandler cGroupsHandler;
+ private static final CGroupsHandler.CGroupController PIDS = CGroupsHandler.CGroupController.PIDS;
+ private int processMaxCount;
+
+ CGroupsPidsResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
+ this.cGroupsHandler = cGroupsHandler;
+ }
+
+ @Override
+ public List bootstrap(Configuration conf)
+ throws ResourceHandlerException {
+ this.cGroupsHandler.initializeCGroupController(PIDS);
+ processMaxCount =
+ conf.getInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_PROCESS_NUM_MAX_LIMIT);
+ if (processMaxCount < 0){
+ throw new ResourceHandlerException(
+ "Illegal value '" + processMaxCount + "' "
+ + YarnConfiguration.
+ NM_CONTAINER_PROCESS_MAX_LIMIT_NUM
+ + ". Value must be positive number.");
+ }
+ LOG.info("Maximum number of processes is " + processMaxCount);
+
+ return null;
+ }
+
+ @Override
+ public List preStart(Container container)
+ throws ResourceHandlerException {
+
+ String cgroupId = container.getContainerId().toString();
+ cGroupsHandler.createCGroup(PIDS, cgroupId);
+ try {
+ cGroupsHandler.updateCGroupParam(PIDS, cgroupId,
+ CGroupsHandler.CGROUP_PIDS_MAX, String.valueOf(processMaxCount));
+ } catch (ResourceHandlerException re) {
+ cGroupsHandler.deleteCGroup(PIDS, cgroupId);
+ LOG.error("Could not update cgroup for container", re);
+ throw re;
+ }
+ List ret = new ArrayList<>();
+ ret.add(new PrivilegedOperation(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+ PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
+ .getPathForCGroupTasks(PIDS, cgroupId)));
+ return ret;
+ }
+
+ @Override
+ public List reacquireContainer(ContainerId containerId)
+ throws ResourceHandlerException {
+ return null;
+ }
+
+ @Override
+ public List updateContainer(Container container)
+ throws ResourceHandlerException {
+ return null;
+ }
+
+ @Override
+ public List postComplete(ContainerId containerId)
+ throws ResourceHandlerException {
+ cGroupsHandler.deleteCGroup(PIDS, containerId.toString());
+ return null;
+ }
+
+ @Override
+ public List teardown()
+ throws ResourceHandlerException {
+ return null;
+ }
+
+ public int getProcessMaxCount() {
+ return processMaxCount;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/PidsResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/PidsResourceHandler.java
new file mode 100644
index 0000000000000..3d4fcb421a035
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/PidsResourceHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Resource handler for pid resources.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface PidsResourceHandler extends ResourceHandler {
+
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
index 0662668b4cf01..9c48a4e1324b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
@@ -278,6 +278,15 @@ private static ResourceHandler getNumaResourceHandler(Configuration conf,
return null;
}
+ private static ResourceHandler
+ getPidsResourceHandler(Configuration conf) throws ResourceHandlerException{
+ if (conf.getBoolean(YarnConfiguration.NM_CONTAINER_PROCESS_MONITOR_ENABLED,
+ YarnConfiguration.DEFAULT_NM_CONTAINER_PROCESS_MONITOR_ENABLED)){
+ return new CGroupsPidsResourceHandlerImpl(getCGroupsHandler());
+ }
+ return null;
+ }
+
private static void addHandlerIfNotNull(List handlerList,
ResourceHandler handler) {
if (handler != null) {
@@ -299,6 +308,7 @@ private static void initializeConfiguredResourceHandlerChain(
addHandlerIfNotNull(handlerList,
initCGroupsCpuResourceHandler(conf));
addHandlerIfNotNull(handlerList, getNumaResourceHandler(conf, nmContext));
+ addHandlerIfNotNull(handlerList, getPidsResourceHandler(conf));
addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext);
resourceHandlerChain = new ResourceHandlerChain(handlerList);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestPidsResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestPidsResourceHandlerImpl.java
new file mode 100644
index 0000000000000..0b99f2599be79
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestPidsResourceHandlerImpl.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+
+/**
+ * Test class for CGroupsPidsResourceHandlerImpl.
+ *
+ */
+public class TestPidsResourceHandlerImpl {
+
+ private CGroupsPidsResourceHandlerImpl pidsResourceHandler;
+ private CGroupsHandler mockCGroupsHandler;
+
+ @Before
+ public void setUp() throws IOException, ResourceHandlerException {
+ mockCGroupsHandler = mock(CGroupsHandler.class);
+ when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn(".");
+ pidsResourceHandler =
+ new CGroupsPidsResourceHandlerImpl(mockCGroupsHandler);
+ }
+
+ @Test
+ public void testBootstrap() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ List ret =
+ pidsResourceHandler.bootstrap(conf);
+ verify(mockCGroupsHandler, times(1))
+ .initializeCGroupController(CGroupsHandler.CGroupController.PIDS);
+ Assert.assertNull(ret);
+ Assert.assertEquals("Default process number incorrect", 10000,
+ pidsResourceHandler.getProcessMaxCount());
+ }
+
+ @Test
+ public void testProcessNumbers() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, -1);
+ try {
+ pidsResourceHandler.bootstrap(conf);
+ Assert.fail("Negative values for process number should not be allowed.");
+ } catch (ResourceHandlerException re) {
+ // do nothing
+ }
+
+ conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, 1000);
+ pidsResourceHandler.bootstrap(conf);
+ Assert.assertEquals("process number value incorrect", 1000,
+ pidsResourceHandler.getProcessMaxCount());
+ }
+
+ @Test
+ public void testPreStart() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, 1024);
+ pidsResourceHandler.bootstrap(conf);
+ String id = "container_01_01";
+ String path = "test-path/" + id;
+ ContainerId mockContainerId = mock(ContainerId.class);
+ when(mockContainerId.toString()).thenReturn(id);
+ Container mockContainer = mock(Container.class);
+ when(mockContainer.getContainerId()).thenReturn(mockContainerId);
+ when(mockCGroupsHandler
+ .getPathForCGroupTasks(CGroupsHandler.CGroupController.PIDS, id))
+ .thenReturn(path);
+ int maxProcess = 1024;
+ List ret =
+ pidsResourceHandler.preStart(mockContainer);
+ verify(mockCGroupsHandler, times(1))
+ .createCGroup(CGroupsHandler.CGroupController.PIDS, id);
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.PIDS, id,
+ CGroupsHandler.CGROUP_PIDS_MAX, String.valueOf(maxProcess));
+ Assert.assertNotNull(ret);
+ Assert.assertEquals(1, ret.size());
+ PrivilegedOperation op = ret.get(0);
+ Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+ op.getOperationType());
+ List args = op.getArguments();
+ Assert.assertEquals(1, args.size());
+ Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
+ args.get(0));
+ }
+
+ @Test
+ public void testReacquireContainer() throws Exception {
+ ContainerId containerIdMock = mock(ContainerId.class);
+ Assert.assertNull(
+ pidsResourceHandler.reacquireContainer(containerIdMock));
+ }
+
+ @Test
+ public void testPostComplete() throws Exception {
+ String id = "container_01_01";
+ ContainerId mockContainerId = mock(ContainerId.class);
+ when(mockContainerId.toString()).thenReturn(id);
+ Assert
+ .assertNull(pidsResourceHandler.postComplete(mockContainerId));
+ verify(mockCGroupsHandler, times(1))
+ .deleteCGroup(CGroupsHandler.CGroupController.PIDS, id);
+ }
+
+ @Test
+ public void testTeardown() throws Exception {
+ Assert.assertNull(pidsResourceHandler.teardown());
+ }
+}