Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ private static void addDeprecatedKeys() {

public static final String YARN_PREFIX = "yarn.";

public static final String SCHEDULER_PREFIX = "yarn.scheduler.";

/////////////////////////////
// Resource types configs
////////////////////////////
Expand Down Expand Up @@ -421,6 +423,22 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean
DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false;

/** Whether to assign containers on an overload node. */
public static final String NODE_LOAD_BASED_ASSIGN_ENABLE = SCHEDULER_PREFIX + "node-load-based-assign-enabled";
public static final boolean DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE = false;

/** The max ratio limit of (used memory / total memory). */
public static final String NODE_LOAD_MEMORY_LIMIT = SCHEDULER_PREFIX + "node-load-memory-limit";
public static final float DEFAULT_NODE_LOAD_MEMORY_LIMIT = -1.0f;

/** The max ratio limit of (used cpu / total cpu). */
public static final String NODE_LOAD_CPU_LIMIT = SCHEDULER_PREFIX + "node-load-cpu-limit";
public static final float DEFAULT_NODE_LOAD_CPU_LIMIT = -1.0f;

/** The max ratio limit of disk io load. */
public static final String NODE_LOAD_DISK_IO_LIMIT = SCHEDULER_PREFIX + "node-load-disk-io-limit";
public static final float DEFAULT_NODE_LOAD_DISK_IO_LIMIT = -1.0f;

/**
* Maximum number of opportunistic containers to be allocated in
* AM heartbeat.
Expand Down Expand Up @@ -1974,6 +1992,11 @@ public static boolean isAclEnabled(Configuration conf) {
@Deprecated
public final static int DEFAULT_NM_CONTAINER_MON_INTERVAL_MS = 3000;

/** How long is the NM resource sample period.*/
public final static String NM_RESOURCE_SAMPLE_PERIOD_MS =
NM_PREFIX + "resource.sample.period-ms";
public final static long DEFAULT_NM_RESOURCE_SAMPLE_PERIOD_MS = 10000;

/** Class that calculates current resource utilization.*/
public static final String NM_MON_RESOURCE_CALCULATOR =
NM_PREFIX + "resource-calculator.class";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.util;

import org.apache.hadoop.util.NodeResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -129,6 +130,24 @@ public float getCpuUsagePercentage() {
return sys.getCpuUsagePercentage();
}

/**
* Obtain the IO usage % of the machine. Return -1 if it is unavailable
*
* @return IO usage in %
*/
public float getIoUsagePercentage(String[] paths) {
return sys.getIoUsagePercentage(paths);
}

/**
* Obtain the node resource of the machine. Return null if it is unavailable
*
* @return cpu & io & memory usage in %
*/
public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) {
return sys.getNodeResourceLastPeriod(localDirs, millis);
}

/**
* Obtain the number of VCores used. Return -1 if it is unavailable.
*
Expand Down Expand Up @@ -159,17 +178,17 @@ public long getNetworkBytesWritten() {
*
* @return total number of bytes read.
*/
public long getStorageBytesRead() {
return sys.getStorageBytesRead();
public long getStorageBytesRead(String[] paths) {
return sys.getStorageBytesRead(paths);
}

/**
* Obtain the aggregated number of bytes written to disks.
*
* @return total number of bytes written.
*/
public long getStorageBytesWritten() {
return sys.getStorageBytesWritten();
public long getStorageBytesWritten(String[] paths) {
return sys.getStorageBytesWritten(paths);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId,
NodeHealthStatus nodeHealthStatus,
ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization,
List<Container> increasedContainers) {
List<Container> increasedContainers,
float cpuUsage, float ioUsage, float memUsage) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
Expand All @@ -69,6 +70,9 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId,
nodeStatus.setContainersUtilization(containersUtilization);
nodeStatus.setNodeUtilization(nodeUtilization);
nodeStatus.setIncreasedContainers(increasedContainers);
nodeStatus.setCpuUsage(cpuUsage);
nodeStatus.setIoUsage(ioUsage);
nodeStatus.setMemUsage(memUsage);
return nodeStatus;
}

Expand Down Expand Up @@ -132,4 +136,28 @@ public abstract void setIncreasedContainers(
@Unstable
public abstract void setOpportunisticContainersStatus(
OpportunisticContainersStatus opportunisticContainersStatus);

@Private
@Unstable
public abstract void setCpuUsage(float cpuUsage);

@Private
@Unstable
public abstract void setIoUsage(float ioUsage);

@Private
@Unstable
public abstract void setMemUsage(float memUsage);

@Private
@Unstable
public abstract float getCpuUsage();

@Private
@Unstable
public abstract float getIoUsage();

@Private
@Unstable
public abstract float getMemUsage();
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,4 +487,38 @@ private ContainerProto convertToProtoFormat(
Container c) {
return ((ContainerPBImpl)c).getProto();
}

@Override
public synchronized float getCpuUsage() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getCpuUsage();
}
@Override
public synchronized void setCpuUsage(float cpuUsage) {
maybeInitBuilder();
builder.setCpuUsage(cpuUsage);
}

@Override
public synchronized float getIoUsage() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getIoUsage();
}
@Override
public synchronized void setIoUsage(float ioUsage) {
maybeInitBuilder();
builder.setIoUsage(ioUsage);
}

@Override
public synchronized float getMemUsage() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getMemUsage();
}

@Override
public synchronized void setMemUsage(float memUsage) {
maybeInitBuilder();
builder.setMemUsage(memUsage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ message NodeStatusProto {
optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8;
optional OpportunisticContainersStatusProto opportunistic_containers_status = 9;
optional float cpu_usage = 10;
optional float io_usage = 11;
optional float mem_usage = 12;
}

message OpportunisticContainersStatusProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.NodeResource;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.Resources;
Expand Down Expand Up @@ -149,13 +151,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;

private Resource lastCapability;

private Runnable statusUpdaterRunnable;
private Thread statusUpdater;
private boolean failedToConnect = false;
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
private boolean registeredWithRM = false;
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();

private final ResourceCalculatorPlugin resourceCalculatorPlugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, null);

private NMNodeLabelsHandler nodeLabelsHandler;
private NMNodeAttributesHandler nodeAttributesHandler;
private NodeLabelsProvider nodeLabelsProvider;
Expand Down Expand Up @@ -512,6 +519,27 @@ private List<ApplicationId> createKeepAliveApplicationList() {
@VisibleForTesting
protected NodeStatus getNodeStatus(int responseId) throws IOException {

float cpuUsage = 0, ioUsage = 0, memUsage = 0;
Configuration conf = getConfig();
long millis = conf.getLong(
YarnConfiguration.NM_RESOURCE_SAMPLE_PERIOD_MS,
YarnConfiguration.DEFAULT_NM_RESOURCE_SAMPLE_PERIOD_MS);

if (resourceCalculatorPlugin != null) {
NodeResource nodeResource = resourceCalculatorPlugin.getNodeResourceLastPeriod(
conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS), millis);
if (nodeResource != null) {
cpuUsage = nodeResource.getCpuUsage() / 100F;
ioUsage = nodeResource.getIoUsage();
memUsage = nodeResource.getMemoryUsage();
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Node " + nodeId + ", cpu usage is " + cpuUsage
+ ", disk io usage is " + ioUsage + ", memory usage is " + memUsage);
}

NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
Expand All @@ -528,7 +556,8 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException {
NodeStatus nodeStatus =
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
createKeepAliveApplicationList(), nodeHealthStatus,
containersUtilization, nodeUtilization, increasedContainers);
containersUtilization, nodeUtilization, increasedContainers,
cpuUsage, ioUsage, memUsage);

nodeStatus.setOpportunisticContainersStatus(
getOpportunisticContainersStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;

import org.apache.hadoop.util.NodeResource;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;

public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin {
Expand Down Expand Up @@ -71,6 +72,16 @@ public float getCpuUsagePercentage() {
return 0;
}

@Override
public float getIoUsagePercentage(String[] paths) {
return 0;
}

@Override
public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) {
return new NodeResource(0, 0, 0);
}

@Override
public float getNumVCoresUsed() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.nodemanager.util;

import org.apache.hadoop.util.NodeResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
Expand Down Expand Up @@ -77,6 +78,16 @@ public float getCpuUsagePercentage() {
return 0;
}

@Override
public float getIoUsagePercentage(String[] paths) {
return 0;
}

@Override
public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) {
return new NodeResource(0, 0, 0);
}

@Override
public int getNumCores() {
return 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.util.resource.Resources;
Expand Down Expand Up @@ -87,6 +88,12 @@ public interface RMNode {
*/
public String getHealthReport();

/**
* the latest node status report received from this node.
* @return the latest node status report received from this node.
*/
public NodeStatus getNodeStatus();

/**
* the time of the latest health report received from this node.
* @return the time of the latest health report received from this node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/** Physical resources in the node. */
private volatile Resource physicalResource;

private NodeStatus nodeStatus;

/* Container Queue Information for the node.. Used by Distributed Scheduler */
private OpportunisticContainersStatus opportunisticContainersStatus;

Expand Down Expand Up @@ -511,6 +513,27 @@ public void setHealthReport(String healthReport) {
this.writeLock.unlock();
}
}

@Override
public NodeStatus getNodeStatus() {
this.readLock.lock();

try {
return this.nodeStatus;
} finally {
this.readLock.unlock();
}
}

public void setNodeStatus(NodeStatus nodeStatus) {
this.writeLock.lock();

try {
this.nodeStatus = nodeStatus;
} finally {
this.writeLock.unlock();
}
}

public void setLastHealthReportTime(long lastHealthReportTime) {
this.writeLock.lock();
Expand Down Expand Up @@ -945,6 +968,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
List<NMContainerStatus> containers = null;

rmNode.setNodeStatus(startEvent.getNodeStatus());

NodeId nodeId = rmNode.nodeId;
RMNode previousRMNode =
rmNode.context.getInactiveRMNodes().remove(nodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
this.logAggregationReportsForApps = logAggregationReportsForApps;
}

public NodeStatus getNodeStatus() {
return this.nodeStatus;
}

public NodeHealthStatus getNodeHealthStatus() {
return this.nodeStatus.getNodeHealthStatus();
}
Expand Down
Loading