Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,9 @@ public class ContainerExitStatus {
*/
public static final int KILLED_FOR_EXCESS_LOGS = -109;

/**
* Container was terminated since exceeds CPU limit.
*/
public static final int KILLED_EXCEEDED_PCORE = -110;

}
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,21 @@ public static boolean isAclEnabled(Configuration conf) {
+ "elastic-memory-control.enabled";
public static final boolean DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED = false;

/** Specifies whether physical core check is enabled. */
public static final String NM_PCORE_CHECK_ENABLED = NM_PREFIX
+ "pcore-check-enabled";
public static final boolean DEFAULT_NM_PCORE_CHECK_ENABLED = true;

/** Specifies the max pcore ratio before killed. */
public static final String NM_PCORE_LIMIT_RATIO = NM_PREFIX
+ "pcore-limit-ratio";
public static final float DEFAULT_NM_PCORE_LIMIT_RATIO = 1.0f;

/** Specifies the times of exceed pcore ratio limit before killed. */
public static final String NM_PCORE_LIMIT_TIMES = NM_PREFIX
+ "pcore-limit-times";
public static final int DEFAULT_NM_PCORE_LIMIT_TIMES = 3;

/** Specifies the OOM handler code. */
public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER = NM_PREFIX
+ "elastic-memory-control.oom-handler";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,7 @@ public interface ResourceView {

boolean isPmemCheckEnabled();

boolean isPcoreCheckEnabled();

long getVCoresAllocatedForContainers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import java.util.Arrays;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Map.Entry;
Expand Down Expand Up @@ -109,6 +110,11 @@ public class ContainersMonitorImpl extends AbstractService implements
private static final long UNKNOWN_MEMORY_LIMIT = -1L;
private int nodeCpuPercentageForYARN;

private boolean pcoreCheckEnabled;
private float cpuLimitRatio;
private int cpuLimitTimes;
private Map<ContainerId,Integer> cpuExceedTimesMap = new HashMap<>();

/**
* Type of container metric.
*/
Expand Down Expand Up @@ -210,6 +216,16 @@ protected void serviceInit(Configuration myConf) throws Exception {
LOG.info("Elastic memory control enabled: {}", elasticMemoryEnforcement);
LOG.info("Strict memory control enabled: {}", strictMemoryEnforcement);

pcoreCheckEnabled = conf.getBoolean(YarnConfiguration.NM_PCORE_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_PCORE_CHECK_ENABLED);

// if cpuLimitRatio is 3.0, it means currentPcoreUsagePercentage shouldn't exceed 300
cpuLimitRatio = conf.getFloat(YarnConfiguration.NM_PCORE_LIMIT_RATIO,
YarnConfiguration.DEFAULT_NM_PCORE_LIMIT_RATIO);
cpuLimitTimes = conf.getInt(YarnConfiguration.NM_PCORE_LIMIT_TIMES,
YarnConfiguration.DEFAULT_NM_PCORE_LIMIT_TIMES);
LOG.info("Physical core check enabled: " + pcoreCheckEnabled);

if (elasticMemoryEnforcement) {
if (!CGroupElasticMemoryController.isAvailable()) {
// Test for availability outside the constructor
Expand Down Expand Up @@ -463,20 +479,51 @@ private boolean isProcessTreeOverLimit(String containerId,

if (currentMemUsage > (2 * memLimit)) {
LOG.warn("Process tree for container: {} running over twice "
+ "the configured limit. Limit={}, current usage = {}",
+ "the configured memory limit. Limit={}, current memory usage = {}",
containerId, memLimit, currentMemUsage);
isOverLimit = true;
} else if (curMemUsageOfAgedProcesses > memLimit) {
LOG.warn("Process tree for container: {} has processes older than 1 "
+ "iteration running over the configured limit. "
+ "Limit={}, current usage = {}",
+ "iteration running over the configured memory limit. "
+ "Limit={}, current memory usage = {}",
containerId, memLimit, curMemUsageOfAgedProcesses);
isOverLimit = true;
}

return isOverLimit;
}

/**
* Container exceeding cpu limit `cpuLimitRatio` for more than
* yarn.nodemanager.pcore-limit-times * yarn.nodemanager.container-monitor.interval-ms,
* will be killed automatically.
*/
boolean isProcessTreeOverLimit(
ContainerId containerId, float currentPcoreUsagePercentage) {
boolean isOverLimit = false;
if (currentPcoreUsagePercentage > 100 * cpuLimitRatio) {
int cpuExceedTimes =
cpuExceedTimesMap.getOrDefault(containerId, 0);
cpuExceedTimes++;
LOG.warn("Process tree for container: " + containerId
+ " running over " + "the configured CPU limit. Limit="
+ 100 * cpuLimitRatio + ", current usage = "
+ currentPcoreUsagePercentage + ", cpuExceedTimes ="
+ cpuExceedTimes);
if (cpuExceedTimes >= cpuLimitTimes) {
isOverLimit = true;
cpuExceedTimesMap.remove(containerId);
LOG.warn("Container " + containerId +
" meets the max cpu limit times " + cpuLimitTimes);
} else {
cpuExceedTimesMap.put(containerId, cpuExceedTimes);
}
} else {
cpuExceedTimesMap.remove(containerId);
}
return isOverLimit;
}

// method provided just for easy testing purposes
boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree,
String containerId, long limit) {
Expand Down Expand Up @@ -537,6 +584,8 @@ public void run() {
pTree.updateProcessTree(); // update process-tree
long currentVmemUsage = pTree.getVirtualMemorySize();
long currentPmemUsage = pTree.getRssMemorySize();
float currentPcoreUsagePercentage =
pTree.getCpuUsagePercent() / ptInfo.getCpuVcores();
if (currentVmemUsage < 0 || currentPmemUsage < 0) {
// YARN-6862/YARN-5021 If the container just exited or for
// another reason the physical/virtual memory is UNAVAILABLE (-1)
Expand All @@ -556,6 +605,12 @@ public void run() {
LOG.info("Skipping monitoring container {} since "
+ "CPU usage is not yet available.", containerId);
continue;
} else {
LOG.info(String.format(
"CPU usage of ProcessTree %s for container-id %s: ",
pId, containerId.toString()) +
String.format("%s of %s per physical core used; ",
currentPcoreUsagePercentage, 100 * cpuLimitRatio));
}

recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage,
Expand Down Expand Up @@ -599,6 +654,10 @@ public void run() {
trackedContainersUtilization.getCPU());
}

// Remove the outdated key
cpuExceedTimesMap.entrySet().removeIf(
e -> !trackingContainers.containsKey(e.getKey()));

try {
Thread.sleep(monitoringInterval);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -752,6 +811,7 @@ private void checkLimit(ContainerId containerId, String pId,
return;
}
boolean isMemoryOverLimit = false;
boolean isCpuOverLimit = false;
String msg = "";
int containerExitStatus = ContainerExitStatus.INVALID;

Expand All @@ -761,6 +821,9 @@ private void checkLimit(ContainerId containerId, String pId,
// are processes more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1);
float currentPcoreUsagePercentage =
pTree.getCpuUsagePercent() / ptInfo.getCpuVcores() *
maxVCoresAllottedForContainers / resourceCalculatorPlugin.getNumProcessors();
if (isVmemCheckEnabled()
&& isProcessTreeOverLimit(containerId.toString(),
currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
Expand All @@ -771,7 +834,7 @@ && isProcessTreeOverLimit(containerId.toString(),
// Container (the root process) is still alive and overflowing
// memory.
// Dump the process-tree and then clean it up.
msg = formatErrorMessage("virtual",
msg = formatMemoryErrorMessage("virtual",
formatUsageString(currentVmemUsage, vmemLimit,
currentPmemUsage, pmemLimit),
pId, containerId, pTree, delta);
Expand All @@ -788,19 +851,28 @@ && isProcessTreeOverLimit(containerId.toString(),
// Container (the root process) is still alive and overflowing
// memory.
// Dump the process-tree and then clean it up.
msg = formatErrorMessage("physical",
msg = formatMemoryErrorMessage("physical",
formatUsageString(currentVmemUsage, vmemLimit,
currentPmemUsage, pmemLimit),
pId, containerId, pTree, delta);
isMemoryOverLimit = true;
containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
} else if (isPcoreCheckEnabled()
&& isProcessTreeOverLimit(containerId,
currentPcoreUsagePercentage)) {
// Container (the root process) is still alive and exceed cpu limit.
// Dump the process-tree and then clean it up.
msg = formatCpuErrorMessage(currentPcoreUsagePercentage, cpuLimitRatio,
pId, containerId, pTree);
isCpuOverLimit = true;
containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PCORE;
}

if (isMemoryOverLimit

if ((isMemoryOverLimit || isCpuOverLimit)
&& trackingContainers.remove(containerId) != null) {
// Virtual or physical memory over limit. Fail the container and
// remove
// the corresponding process tree
// Virtual or physical memory or physical core over limit.
// Fail the container and remove the corresponding process tree.
LOG.warn(msg);
// warn if not a leader
if (!pTree.checkPidPgrpidForMatch()) {
Expand Down Expand Up @@ -846,7 +918,7 @@ private void reportResourceUsage(ContainerId containerId,
* @param pTree process tree to dump full resource utilization graph
* @return formatted resource usage information
*/
private String formatErrorMessage(String memTypeExceeded,
private String formatMemoryErrorMessage(String memTypeExceeded,
String usageString, String pId, ContainerId containerId,
ResourceCalculatorProcessTree pTree, long delta) {
return
Expand All @@ -859,6 +931,28 @@ private String formatErrorMessage(String memTypeExceeded,
pTree.getProcessTreeDump();
}

/**
* Format string when memory limit has been exceeded.
* @param currentPcoreUsagePercentage current pcore usage
* @param pcoreLimitRatio pcore limit threshold
* @param pId process id
* @param containerId container id
* @param pTree process tree to dump full resource utilization graph
* @return formatted resource usage information
*/
private String formatCpuErrorMessage(
float currentPcoreUsagePercentage, float pcoreLimitRatio,
String pId, ContainerId containerId, ResourceCalculatorProcessTree pTree) {
return
String.format("Container [pid=%s,containerID=%s] is running beyond %s cpu limits. ",
pId, containerId, pcoreLimitRatio * 100) +
"Current usage: " + currentPcoreUsagePercentage +
". Killing container.\n" +
"Dump of the process-tree for " + containerId + " :\n" +
pTree.getProcessTreeDump();
}


/**
* Format memory usage string for reporting.
* @param currentVmemUsage virtual memory usage
Expand Down Expand Up @@ -1013,6 +1107,16 @@ public boolean isPmemCheckEnabled() {
return this.pmemCheckEnabled;
}

/**
* Is the total physical core check enabled?
*
* @return true if total physical core check is enabled.
*/
@Override
public boolean isPcoreCheckEnabled() {
return this.pcoreCheckEnabled;
}

@Override
public long getPmemAllocatedForContainers() {
return this.maxPmemAllottedForContainers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public boolean isVmemCheckEnabled() {
public boolean isPmemCheckEnabled() {
return true;
}

@Override
public boolean isPcoreCheckEnabled() {
return false;
}
};
conf.set(YarnConfiguration.NM_LOCAL_DIRS, TESTROOTDIR.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public boolean isVmemCheckEnabled() {
public boolean isPmemCheckEnabled() {
return true;
}

@Override
public boolean isPcoreCheckEnabled() {
return false;
}
};
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
Expand Down Expand Up @@ -171,6 +176,11 @@ public boolean isVmemCheckEnabled() {
public boolean isPmemCheckEnabled() {
return true;
}

@Override
public boolean isPcoreCheckEnabled() {
return false;
}
};
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public boolean isVmemCheckEnabled() {
public boolean isPmemCheckEnabled() {
return true;
}

@Override
public boolean isPcoreCheckEnabled() {
return false;
}
};
nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
bind(JAXBContextResolver.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public boolean isVmemCheckEnabled() {
public boolean isPmemCheckEnabled() {
return true;
}

@Override
public boolean isPcoreCheckEnabled() {
return false;
}
};
nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
bind(JAXBContextResolver.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public boolean isVmemCheckEnabled() {
public boolean isPmemCheckEnabled() {
return true;
}

@Override
public boolean isPcoreCheckEnabled() {
return false;
}
};
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public boolean isVmemCheckEnabled() {
public boolean isPmemCheckEnabled() {
return true;
}

@Override
public boolean isPcoreCheckEnabled() {
return false;
}
};
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public boolean isVmemCheckEnabled() {
public boolean isPmemCheckEnabled() {
return true;
}

@Override
public boolean isPcoreCheckEnabled() {
return false;
}
};
conf.set(YarnConfiguration.NM_LOCAL_DIRS, TESTROOTDIR.getAbsolutePath());
conf.set(YarnConfiguration.NM_LOG_DIRS, TESTLOGDIR.getAbsolutePath());
Expand Down