Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1575,14 +1575,25 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
false;


// Configurations for applicaiton life time monitor feature
public static final String RM_APPLICATION_MONITOR_INTERVAL_MS =
RM_PREFIX + "application-timeouts.monitor.interval-ms";

public static final long DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS =
3000;

/** Overallocation (= allocation based on utilization) configs. */
public static final String NM_OVERALLOCATION_ALLOCATION_THRESHOLD =
NM_PREFIX + "overallocation.allocation-threshold";
public static final float DEFAULT_NM_OVERALLOCATION_ALLOCATION_THRESHOLD
= 0f;
@Private
public static final float MAX_NM_OVERALLOCATION_ALLOCATION_THRESHOLD = 0.95f;
public static final String NM_OVERALLOCATION_PREEMPTION_THRESHOLD =
NM_PREFIX + "overallocation.preemption-threshold";
public static final float DEFAULT_NM_OVERALLOCATION_PREEMPTION_THRESHOLD
= 0f;

/**
* Interval of time the linux container executor should try cleaning up
* cgroups entry when cleaning up a container. This is required due to what
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1562,6 +1562,27 @@
<value>false</value>
</property>

<property>
<description>The extent of over-allocation (container-allocation based on
current utilization instead of prior allocation) allowed on this node,
expressed as a float between 0 and 0.95. By default, over-allocation is
turned off (value = 0). When turned on, the node allows running
OPPORTUNISTIC containers when the aggregate utilization is under the
value specified here multiplied by the node's advertised capacity.
</description>
<name>yarn.nodemanager.overallocation.allocation-threshold</name>
<value>0f</value>
</property>

<property>
<description>When a node is over-allocated to improve utilization by
running OPPORTUNISTIC containers, this config captures the utilization
beyond which OPPORTUNISTIC containers should start getting preempted.
</description>
<name>yarn.nodemanager.overallocation.preemption-threshold</name>
<value>1</value>
</property>

<property>
<description>This configuration setting determines the capabilities
assigned to docker containers when they are launched. While these may not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.util.Records;

public abstract class RegisterNodeManagerRequest {
Expand All @@ -42,14 +43,14 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, nodeLabels, null);
containerStatuses, runningApplications, nodeLabels, null, null);
}

public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
Resource physicalResource) {
Resource physicalResource, OverAllocationInfo overAllocationInfo) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
Expand All @@ -60,17 +61,22 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
request.setRunningApplications(runningApplications);
request.setNodeLabels(nodeLabels);
request.setPhysicalResource(physicalResource);
request.setOverAllocationInfo(overAllocationInfo);
return request;
}

public abstract NodeId getNodeId();
public abstract int getHttpPort();
public abstract Resource getResource();
public abstract String getNMVersion();
public abstract List<NMContainerStatus> getNMContainerStatuses();
public abstract Set<NodeLabel> getNodeLabels();
public abstract void setNodeLabels(Set<NodeLabel> nodeLabels);


public abstract OverAllocationInfo getOverAllocationInfo();
public abstract void setOverAllocationInfo(
OverAllocationInfo overAllocationInfo);

/**
* We introduce this here because currently YARN RM doesn't persist nodes info
* for application running. When RM restart happened, we cannot determinate if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;

import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.api.records.impl.pb.OverAllocationInfoPBImpl;

public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null;
Expand All @@ -55,6 +58,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private List<NMContainerStatus> containerStatuses = null;
private List<ApplicationId> runningApplications = null;
private Set<NodeLabel> labels = null;
private OverAllocationInfo overAllocationInfo = null;

/** Physical resources in the node. */
private Resource physicalResource = null;
Expand Down Expand Up @@ -99,6 +103,10 @@ private synchronized void mergeLocalToBuilder() {
if (this.physicalResource != null) {
builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
}
if (this.overAllocationInfo != null) {
builder.setOverAllocationInfo(
convertToProtoFormat(this.overAllocationInfo));
}
}

private synchronized void addNMContainerStatusesToProto() {
Expand Down Expand Up @@ -340,7 +348,30 @@ public synchronized void setNodeLabels(Set<NodeLabel> nodeLabels) {
builder.clearNodeLabels();
this.labels = nodeLabels;
}


@Override
public synchronized OverAllocationInfo getOverAllocationInfo() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.overAllocationInfo != null) {
return this.overAllocationInfo;
}
if (!p.hasOverAllocationInfo()) {
return null;
}
this.overAllocationInfo = convertFromProtoFormat(p.getOverAllocationInfo());
return this.overAllocationInfo;
}

@Override
public synchronized void setOverAllocationInfo(
OverAllocationInfo overAllocationInfo) {
maybeInitBuilder();
if (this.overAllocationInfo == null) {
builder.clearOverAllocationInfo();
}
this.overAllocationInfo = overAllocationInfo;
}

private synchronized void initNodeLabels() {
if (this.labels != null) {
return;
Expand Down Expand Up @@ -399,4 +430,14 @@ private static NMContainerStatusProto convertToProtoFormat(
NMContainerStatus c) {
return ((NMContainerStatusPBImpl)c).getProto();
}

private static OverAllocationInfoProto convertToProtoFormat(
OverAllocationInfo overAllocationInfo) {
return ((OverAllocationInfoPBImpl)overAllocationInfo).getProto();
}

private static OverAllocationInfo convertFromProtoFormat(
OverAllocationInfoProto proto) {
return new OverAllocationInfoPBImpl(proto);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.api.records;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.api.records.impl.pb
.OverAllocationInfoPBImpl;

/**
* Captures information on how aggressively the scheduler can over-allocate
* OPPORTUNISTIC containers on a node. This is node-specific, and is sent on
* the wire on each heartbeat.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class OverAllocationInfo {
public static OverAllocationInfo newInstance(
ResourceThresholds overAllocationThresholds) {
OverAllocationInfo info = new OverAllocationInfoPBImpl();
info.setOverAllocationThreshold(overAllocationThresholds);
return info;
}

public abstract ResourceThresholds getOverAllocationThresholds();

public abstract void setOverAllocationThreshold(
ResourceThresholds resourceThresholds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.api.records.impl.pb.ResourceThresholdsPBImpl;

/**
* Captures resource thresholds to be used for allocation and preemption
* when over-allocation is turned on.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class ResourceThresholds {
public static ResourceThresholds newInstance(float threshold) {
ResourceThresholds thresholds = new ResourceThresholdsPBImpl();
thresholds.setMemoryThreshold(threshold);
thresholds.setCpuThreshold(threshold);
return thresholds;
}

public abstract float getMemoryThreshold();

public abstract float getCpuThreshold();

public abstract void setMemoryThreshold(float memoryThreshold);

public abstract void setCpuThreshold(float cpuThreshold);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records.impl.pb;

import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProto;
import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;

public class OverAllocationInfoPBImpl extends OverAllocationInfo {
private OverAllocationInfoProto proto =
OverAllocationInfoProto.getDefaultInstance();
private OverAllocationInfoProto.Builder builder = null;
private boolean viaProto = false;

private ResourceThresholds overAllocationThresholds = null;

public OverAllocationInfoPBImpl() {
builder = OverAllocationInfoProto.newBuilder();
}

public OverAllocationInfoPBImpl(OverAllocationInfoProto proto) {
this.proto = proto;
viaProto = true;
}

public synchronized OverAllocationInfoProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}

private synchronized void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}

private synchronized void mergeLocalToBuilder() {
if (overAllocationThresholds != null) {
builder.setOverAllocationThresholds(
convertToProtoFormat(overAllocationThresholds));
}
}

private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = OverAllocationInfoProto.newBuilder(proto);
}
viaProto = false;
}

@Override
public synchronized ResourceThresholds getOverAllocationThresholds() {
OverAllocationInfoProtoOrBuilder p = viaProto ? proto : builder;
if (overAllocationThresholds != null) {
return overAllocationThresholds;
}
if (!p.hasOverAllocationThresholds()) {
return null;
}
overAllocationThresholds =
convertFromProtoFormat(p.getOverAllocationThresholds());
return overAllocationThresholds;
}

@Override
public synchronized void setOverAllocationThreshold(
ResourceThresholds resourceThresholds) {
maybeInitBuilder();
if (this.overAllocationThresholds != null) {
builder.clearOverAllocationThresholds();
}
this.overAllocationThresholds = resourceThresholds;
}

private static ResourceThresholdsProto convertToProtoFormat(
ResourceThresholds overAllocationThresholds) {
return ((ResourceThresholdsPBImpl) overAllocationThresholds).getProto();
}

private static ResourceThresholds convertFromProtoFormat(
ResourceThresholdsProto overAllocationThresholdsProto) {
return new ResourceThresholdsPBImpl(overAllocationThresholdsProto);
}
}
Loading