Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2018,6 +2018,18 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES =
10000000000L;

/** Enable switch for container process number monitoring. */
public static final String NM_CONTAINER_PROCESS_MONITOR_ENABLED =
NM_PREFIX + "container-process-monitor.enable";
public static final boolean
DEFAULT_NM_CONTAINER_PROCESS_MONITOR_ENABLED= false;

/** The max process number limit for a single container. */
public static final String NM_CONTAINER_PROCESS_MAX_LIMIT_NUM =
NM_PREFIX + "container-process-monitor.max-limit-num";
public static final int DEFAULT_NM_CONTAINER_PROCESS_NUM_MAX_LIMIT =
10000;

/** Enable/disable container metrics. */
@Private
public static final String NM_CONTAINER_METRICS_ENABLE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,19 @@
<value>1000000000</value>
</property>

<property>
<description>Flag to enable the container process monitor which enforces
container process number limits.</description>
<name>yarn.nodemanager.container-process-monitor.enable</name>
<value>false</value>
</property>

<property>
<description>The max process number limit for a single container.</description>
<name>yarn.nodemanager.container-process-monitor.max-limit-num</name>
<value>10000</value>
</property>

<property>
<description>The disk space limit, in bytes, for all of a container's
logs</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ enum CGroupController {
CPUACCT("cpuacct"),
CPUSET("cpuset"),
FREEZER("freezer"),
DEVICES("devices");
DEVICES("devices"),
PIDS("pids");

private final String name;

Expand Down Expand Up @@ -84,7 +85,7 @@ public static Set<String> getValidCGroups() {
String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
String CGROUP_NO_LIMIT = "-1";
String UNDER_OOM = "under_oom 1";

String CGROUP_PIDS_MAX = "max";

String CGROUP_CPU_PERIOD_US = "cfs_period_us";
String CGROUP_CPU_QUOTA_US = "cfs_quota_us";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;

import java.util.ArrayList;
import java.util.List;

/**
* An implementation for using CGroups to control the number of the process in container.
*
* The process number controller is used to allow a cgroup hierarchy to stop any
* new tasks from being fork()'d or clone()'d after a certain limit is reached.
* @see <a href="https://www.kernel.org/doc/Documentation/cgroup-v1/pids.txt">PIDS</a>
*/

@InterfaceStability.Unstable
@InterfaceAudience.Private
public class CGroupsPidsResourceHandlerImpl implements PidsResourceHandler {

static final Log LOG = LogFactory.getLog(CGroupsPidsResourceHandlerImpl.class);

private CGroupsHandler cGroupsHandler;
private static final CGroupsHandler.CGroupController PIDS = CGroupsHandler.CGroupController.PIDS;
private int processMaxCount;

CGroupsPidsResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
this.cGroupsHandler = cGroupsHandler;
}

@Override
public List<PrivilegedOperation> bootstrap(Configuration conf)
throws ResourceHandlerException {
this.cGroupsHandler.initializeCGroupController(PIDS);
processMaxCount =
conf.getInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM,
YarnConfiguration.DEFAULT_NM_CONTAINER_PROCESS_NUM_MAX_LIMIT);
if (processMaxCount < 0){
throw new ResourceHandlerException(
"Illegal value '" + processMaxCount + "' "
+ YarnConfiguration.
NM_CONTAINER_PROCESS_MAX_LIMIT_NUM
+ ". Value must be positive number.");
}
LOG.info("Maximum number of processes is " + processMaxCount);

return null;
}

@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {

String cgroupId = container.getContainerId().toString();
cGroupsHandler.createCGroup(PIDS, cgroupId);
try {
cGroupsHandler.updateCGroupParam(PIDS, cgroupId,
CGroupsHandler.CGROUP_PIDS_MAX, String.valueOf(processMaxCount));
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(PIDS, cgroupId);
LOG.error("Could not update cgroup for container", re);
throw re;
}
List<PrivilegedOperation> ret = new ArrayList<>();
ret.add(new PrivilegedOperation(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
.getPathForCGroupTasks(PIDS, cgroupId)));
return ret;
}

@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
return null;
}

@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
return null;
}

@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
cGroupsHandler.deleteCGroup(PIDS, containerId.toString());
return null;
}

@Override
public List<PrivilegedOperation> teardown()
throws ResourceHandlerException {
return null;
}

public int getProcessMaxCount() {
return processMaxCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Resource handler for pid resources.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface PidsResourceHandler extends ResourceHandler {

}
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,15 @@ private static ResourceHandler getNumaResourceHandler(Configuration conf,
return null;
}

private static ResourceHandler
getPidsResourceHandler(Configuration conf) throws ResourceHandlerException{
if (conf.getBoolean(YarnConfiguration.NM_CONTAINER_PROCESS_MONITOR_ENABLED,
YarnConfiguration.DEFAULT_NM_CONTAINER_PROCESS_MONITOR_ENABLED)){
return new CGroupsPidsResourceHandlerImpl(getCGroupsHandler());
}
return null;
}

private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
ResourceHandler handler) {
if (handler != null) {
Expand All @@ -299,6 +308,7 @@ private static void initializeConfiguredResourceHandlerChain(
addHandlerIfNotNull(handlerList,
initCGroupsCpuResourceHandler(conf));
addHandlerIfNotNull(handlerList, getNumaResourceHandler(conf, nmContext));
addHandlerIfNotNull(handlerList, getPidsResourceHandler(conf));
addHandlersFromConfiguredResourcePlugins(handlerList, conf, nmContext);
resourceHandlerChain = new ResourceHandlerChain(handlerList);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;

/**
* Test class for CGroupsPidsResourceHandlerImpl.
*
*/
public class TestPidsResourceHandlerImpl {

private CGroupsPidsResourceHandlerImpl pidsResourceHandler;
private CGroupsHandler mockCGroupsHandler;

@Before
public void setUp() throws IOException, ResourceHandlerException {
mockCGroupsHandler = mock(CGroupsHandler.class);
when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn(".");
pidsResourceHandler =
new CGroupsPidsResourceHandlerImpl(mockCGroupsHandler);
}

@Test
public void testBootstrap() throws Exception {
Configuration conf = new YarnConfiguration();
List<PrivilegedOperation> ret =
pidsResourceHandler.bootstrap(conf);
verify(mockCGroupsHandler, times(1))
.initializeCGroupController(CGroupsHandler.CGroupController.PIDS);
Assert.assertNull(ret);
Assert.assertEquals("Default process number incorrect", 10000,
pidsResourceHandler.getProcessMaxCount());
}

@Test
public void testProcessNumbers() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, -1);
try {
pidsResourceHandler.bootstrap(conf);
Assert.fail("Negative values for process number should not be allowed.");
} catch (ResourceHandlerException re) {
// do nothing
}

conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, 1000);
pidsResourceHandler.bootstrap(conf);
Assert.assertEquals("process number value incorrect", 1000,
pidsResourceHandler.getProcessMaxCount());
}

@Test
public void testPreStart() throws Exception {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.NM_CONTAINER_PROCESS_MAX_LIMIT_NUM, 1024);
pidsResourceHandler.bootstrap(conf);
String id = "container_01_01";
String path = "test-path/" + id;
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.PIDS, id))
.thenReturn(path);
int maxProcess = 1024;
List<PrivilegedOperation> ret =
pidsResourceHandler.preStart(mockContainer);
verify(mockCGroupsHandler, times(1))
.createCGroup(CGroupsHandler.CGroupController.PIDS, id);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.PIDS, id,
CGroupsHandler.CGROUP_PIDS_MAX, String.valueOf(maxProcess));
Assert.assertNotNull(ret);
Assert.assertEquals(1, ret.size());
PrivilegedOperation op = ret.get(0);
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
op.getOperationType());
List<String> args = op.getArguments();
Assert.assertEquals(1, args.size());
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
args.get(0));
}

@Test
public void testReacquireContainer() throws Exception {
ContainerId containerIdMock = mock(ContainerId.class);
Assert.assertNull(
pidsResourceHandler.reacquireContainer(containerIdMock));
}

@Test
public void testPostComplete() throws Exception {
String id = "container_01_01";
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Assert
.assertNull(pidsResourceHandler.postComplete(mockContainerId));
verify(mockCGroupsHandler, times(1))
.deleteCGroup(CGroupsHandler.CGroupController.PIDS, id);
}

@Test
public void testTeardown() throws Exception {
Assert.assertNull(pidsResourceHandler.teardown());
}
}