diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5e4c826a42694..bb346264cd295 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1575,7 +1575,6 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE = false; - // Configurations for applicaiton life time monitor feature public static final String RM_APPLICATION_MONITOR_INTERVAL_MS = RM_PREFIX + "application-timeouts.monitor.interval-ms"; @@ -1583,6 +1582,18 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS = 3000; + /** Overallocation (= allocation based on utilization) configs. */ + public static final String NM_OVERALLOCATION_ALLOCATION_THRESHOLD = + NM_PREFIX + "overallocation.allocation-threshold"; + public static final float DEFAULT_NM_OVERALLOCATION_ALLOCATION_THRESHOLD + = 0f; + @Private + public static final float MAX_NM_OVERALLOCATION_ALLOCATION_THRESHOLD = 0.95f; + public static final String NM_OVERALLOCATION_PREEMPTION_THRESHOLD = + NM_PREFIX + "overallocation.preemption-threshold"; + public static final float DEFAULT_NM_OVERALLOCATION_PREEMPTION_THRESHOLD + = 0f; + /** * Interval of time the linux container executor should try cleaning up * cgroups entry when cleaning up a container. This is required due to what diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e687eef370dec..c131eec17f12f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1562,6 +1562,27 @@ false + + The extent of over-allocation (container-allocation based on + current utilization instead of prior allocation) allowed on this node, + expressed as a float between 0 and 0.95. By default, over-allocation is + turned off (value = 0). When turned on, the node allows running + OPPORTUNISTIC containers when the aggregate utilization is under the + value specified here multiplied by the node's advertised capacity. + + yarn.nodemanager.overallocation.allocation-threshold + 0f + + + + When a node is over-allocated to improve utilization by + running OPPORTUNISTIC containers, this config captures the utilization + beyond which OPPORTUNISTIC containers should start getting preempted. + + yarn.nodemanager.overallocation.preemption-threshold + 1 + + This configuration setting determines the capabilities assigned to docker containers when they are launched. While these may not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index fc30a805bf3e4..da2987f13e9b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.util.Records; public abstract class RegisterNodeManagerRequest { @@ -42,14 +43,14 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, List containerStatuses, List runningApplications, Set nodeLabels) { return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, - containerStatuses, runningApplications, nodeLabels, null); + containerStatuses, runningApplications, nodeLabels, null, null); } public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, List containerStatuses, List runningApplications, Set nodeLabels, - Resource physicalResource) { + Resource physicalResource, OverAllocationInfo overAllocationInfo) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -60,9 +61,10 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setRunningApplications(runningApplications); request.setNodeLabels(nodeLabels); request.setPhysicalResource(physicalResource); + request.setOverAllocationInfo(overAllocationInfo); return request; } - + public abstract NodeId getNodeId(); public abstract int getHttpPort(); public abstract Resource getResource(); @@ -70,7 +72,11 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract List getNMContainerStatuses(); public abstract Set getNodeLabels(); public abstract void setNodeLabels(Set nodeLabels); - + + public abstract OverAllocationInfo getOverAllocationInfo(); + public abstract void setOverAllocationInfo( + OverAllocationInfo overAllocationInfo); + /** * We introduce this here because currently YARN RM doesn't persist nodes info * for application running. When RM restart happened, we cannot determinate if diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 0291e0b866539..7b2208e157c8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -40,11 +40,14 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; - +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.impl.pb.OverAllocationInfoPBImpl; + public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto.Builder builder = null; @@ -55,6 +58,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private List containerStatuses = null; private List runningApplications = null; private Set labels = null; + private OverAllocationInfo overAllocationInfo = null; /** Physical resources in the node. */ private Resource physicalResource = null; @@ -99,6 +103,10 @@ private synchronized void mergeLocalToBuilder() { if (this.physicalResource != null) { builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); } + if (this.overAllocationInfo != null) { + builder.setOverAllocationInfo( + convertToProtoFormat(this.overAllocationInfo)); + } } private synchronized void addNMContainerStatusesToProto() { @@ -340,7 +348,30 @@ public synchronized void setNodeLabels(Set nodeLabels) { builder.clearNodeLabels(); this.labels = nodeLabels; } - + + @Override + public synchronized OverAllocationInfo getOverAllocationInfo() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.overAllocationInfo != null) { + return this.overAllocationInfo; + } + if (!p.hasOverAllocationInfo()) { + return null; + } + this.overAllocationInfo = convertFromProtoFormat(p.getOverAllocationInfo()); + return this.overAllocationInfo; + } + + @Override + public synchronized void setOverAllocationInfo( + OverAllocationInfo overAllocationInfo) { + maybeInitBuilder(); + if (this.overAllocationInfo == null) { + builder.clearOverAllocationInfo(); + } + this.overAllocationInfo = overAllocationInfo; + } + private synchronized void initNodeLabels() { if (this.labels != null) { return; @@ -399,4 +430,14 @@ private static NMContainerStatusProto convertToProtoFormat( NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } + + private static OverAllocationInfoProto convertToProtoFormat( + OverAllocationInfo overAllocationInfo) { + return ((OverAllocationInfoPBImpl)overAllocationInfo).getProto(); + } + + private static OverAllocationInfo convertFromProtoFormat( + OverAllocationInfoProto proto) { + return new OverAllocationInfoPBImpl(proto); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java new file mode 100644 index 0000000000000..77952bfcf30ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.api.records.impl.pb + .OverAllocationInfoPBImpl; + +/** + * Captures information on how aggressively the scheduler can over-allocate + * OPPORTUNISTIC containers on a node. This is node-specific, and is sent on + * the wire on each heartbeat. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class OverAllocationInfo { + public static OverAllocationInfo newInstance( + ResourceThresholds overAllocationThresholds) { + OverAllocationInfo info = new OverAllocationInfoPBImpl(); + info.setOverAllocationThreshold(overAllocationThresholds); + return info; + } + + public abstract ResourceThresholds getOverAllocationThresholds(); + + public abstract void setOverAllocationThreshold( + ResourceThresholds resourceThresholds); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java new file mode 100644 index 0000000000000..d57706aa8ade4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.api.records.impl.pb.ResourceThresholdsPBImpl; + +/** + * Captures resource thresholds to be used for allocation and preemption + * when over-allocation is turned on. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class ResourceThresholds { + public static ResourceThresholds newInstance(float threshold) { + ResourceThresholds thresholds = new ResourceThresholdsPBImpl(); + thresholds.setMemoryThreshold(threshold); + thresholds.setCpuThreshold(threshold); + return thresholds; + } + + public abstract float getMemoryThreshold(); + + public abstract float getCpuThreshold(); + + public abstract void setMemoryThreshold(float memoryThreshold); + + public abstract void setCpuThreshold(float cpuThreshold); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java new file mode 100644 index 0000000000000..758f4fbb651c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProto; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; + +public class OverAllocationInfoPBImpl extends OverAllocationInfo { + private OverAllocationInfoProto proto = + OverAllocationInfoProto.getDefaultInstance(); + private OverAllocationInfoProto.Builder builder = null; + private boolean viaProto = false; + + private ResourceThresholds overAllocationThresholds = null; + + public OverAllocationInfoPBImpl() { + builder = OverAllocationInfoProto.newBuilder(); + } + + public OverAllocationInfoPBImpl(OverAllocationInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized OverAllocationInfoProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (overAllocationThresholds != null) { + builder.setOverAllocationThresholds( + convertToProtoFormat(overAllocationThresholds)); + } + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = OverAllocationInfoProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public synchronized ResourceThresholds getOverAllocationThresholds() { + OverAllocationInfoProtoOrBuilder p = viaProto ? proto : builder; + if (overAllocationThresholds != null) { + return overAllocationThresholds; + } + if (!p.hasOverAllocationThresholds()) { + return null; + } + overAllocationThresholds = + convertFromProtoFormat(p.getOverAllocationThresholds()); + return overAllocationThresholds; + } + + @Override + public synchronized void setOverAllocationThreshold( + ResourceThresholds resourceThresholds) { + maybeInitBuilder(); + if (this.overAllocationThresholds != null) { + builder.clearOverAllocationThresholds(); + } + this.overAllocationThresholds = resourceThresholds; + } + + private static ResourceThresholdsProto convertToProtoFormat( + ResourceThresholds overAllocationThresholds) { + return ((ResourceThresholdsPBImpl) overAllocationThresholds).getProto(); + } + + private static ResourceThresholds convertFromProtoFormat( + ResourceThresholdsProto overAllocationThresholdsProto) { + return new ResourceThresholdsPBImpl(overAllocationThresholdsProto); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java new file mode 100644 index 0000000000000..10fb284de4492 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; + +public class ResourceThresholdsPBImpl extends ResourceThresholds{ + private ResourceThresholdsProto proto = + ResourceThresholdsProto.getDefaultInstance(); + private ResourceThresholdsProto.Builder builder = null; + private boolean viaProto = false; + + public ResourceThresholdsPBImpl() { + builder = ResourceThresholdsProto.newBuilder(); + } + + public ResourceThresholdsPBImpl(ResourceThresholdsProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized ResourceThresholdsProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + /* + * Right now, we have only memory and cpu thresholds that are floats. + * This is a no-op until we add other non-static fields to the proto. + */ + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ResourceThresholdsProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public synchronized float getMemoryThreshold() { + ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder; + return p.getMemory(); + } + + @Override + public synchronized float getCpuThreshold() { + ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder; + return p.getCpu(); + } + + @Override + public synchronized void setMemoryThreshold(float memoryThreshold) { + maybeInitBuilder(); + builder.setMemory(memoryThreshold); + } + + @Override + public synchronized void setCpuThreshold(float cpuThreshold) { + maybeInitBuilder(); + builder.setCpu(cpuThreshold); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index edb2d9ccfba0e..0e71bc109857e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -64,6 +64,7 @@ message RegisterNodeManagerRequestProto { repeated ApplicationIdProto runningApplications = 7; optional NodeLabelsProto nodeLabels = 8; optional ResourceProto physicalResource = 9; + optional OverAllocationInfoProto overAllocationInfo = 10; } message RegisterNodeManagerResponseProto { @@ -185,3 +186,12 @@ message SCMUploaderCanUploadRequestProto { message SCMUploaderCanUploadResponseProto { optional bool uploadable = 1; } + +message OverAllocationInfoProto { + optional ResourceThresholdsProto over_allocation_thresholds = 1; +} + +message ResourceThresholdsProto { + optional float memory = 1 [default = 0]; + optional float cpu = 2 [default = 0]; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 16a84973b690c..179148815bb18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -112,4 +113,8 @@ public interface Context { NMTimelinePublisher getNMTimelinePublisher(); ContainerExecutor getContainerExecutor(); + + boolean isOverAllocationEnabled(); + + OverAllocationInfo getOverAllocationInfo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 3c0e4984efa66..1a3d7dd993cd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -519,6 +520,8 @@ public static class NMContext implements Context { private NMTimelinePublisher nmTimelinePublisher; + private OverAllocationInfo overAllocationInfo; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -665,6 +668,20 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { this.nodeStatusUpdater = nodeStatusUpdater; } + @Override + public boolean isOverAllocationEnabled() { + return getOverAllocationInfo() != null; + } + + @Override + public OverAllocationInfo getOverAllocationInfo() { + return this.overAllocationInfo; + } + + public void setOverAllocationInfo(OverAllocationInfo overAllocationInfo) { + this.overAllocationInfo = overAllocationInfo; + } + public boolean isDistributedSchedulingEnabled() { return isDistSchedulingEnabled; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 00073d85aae2e..9c33af035b1cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -355,7 +355,7 @@ protected void registerWithRM() RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource); + nodeLabels, physicalResource, context.getOverAllocationInfo()); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } @@ -472,8 +472,8 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { = getIncreasedContainers(); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, - createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization, increasedContainers); + createKeepAliveApplicationList(), nodeHealthStatus, + containersUtilization, nodeUtilization, increasedContainers); nodeStatus.setOpportunisticContainersStatus( getOpportunisticContainersStatus()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java index d9cca8f523a15..830782d6f481d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java @@ -26,8 +26,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; @@ -72,6 +74,7 @@ public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { static final int MIN_PERIOD_US = 1000; @VisibleForTesting static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel + static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 2; CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) { this.cGroupsHandler = cGroupsHandler; @@ -181,16 +184,23 @@ public static int[] getOverallLimits(float yarnProcessors) { @Override public List preStart(Container container) throws ResourceHandlerException { - String cgroupId = container.getContainerId().toString(); Resource containerResource = container.getResource(); cGroupsHandler.createCGroup(CPU, cgroupId); try { int containerVCores = containerResource.getVirtualCores(); - int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(cpuShares)); + ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); + if (id != null && id.getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES, + String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC)); + } else { + int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES, + String.valueOf(cpuShares)); + } if (strictResourceUsageMode) { if (nodeVCores != containerVCores) { float containerCPU = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 6ee60bd17a458..55aac265d0e3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -33,8 +33,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; @@ -98,6 +101,8 @@ public enum ContainerMetric { private ResourceUtilization containersUtilization; + private ResourceThresholds overAllocationPreemptionThresholds; + private volatile boolean stopped = false; public ContainersMonitorImpl(ContainerExecutor exec, @@ -172,6 +177,13 @@ protected void serviceInit(Configuration myConf) throws Exception { LOG.info("Physical memory check enabled: " + pmemCheckEnabled); LOG.info("Virtual memory check enabled: " + vmemCheckEnabled); + initializeOverAllocation(conf); + if (context.isOverAllocationEnabled()) { + pmemCheckEnabled = true; + LOG.info("Force enabling physical memory checks because " + + "overallocation is enabled"); + } + containersMonitorEnabled = isContainerMonitorEnabled() && monitoringInterval > 0; LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled); @@ -211,6 +223,28 @@ private boolean isContainerMonitorEnabled() { YarnConfiguration.DEFAULT_NM_CONTAINER_MONITOR_ENABLED); } + private void initializeOverAllocation(Configuration conf) { + float overAllocationTreshold = conf.getFloat( + YarnConfiguration.NM_OVERALLOCATION_ALLOCATION_THRESHOLD, + YarnConfiguration.DEFAULT_NM_OVERALLOCATION_ALLOCATION_THRESHOLD); + overAllocationTreshold = Math.min(overAllocationTreshold, + YarnConfiguration.MAX_NM_OVERALLOCATION_ALLOCATION_THRESHOLD); + overAllocationTreshold = Math.max(0, overAllocationTreshold); + + if (overAllocationTreshold > 0f) { + ((NodeManager.NMContext) context).setOverAllocationInfo( + OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(overAllocationTreshold))); + + float preemptionThreshold = conf.getFloat( + YarnConfiguration.NM_OVERALLOCATION_PREEMPTION_THRESHOLD, + YarnConfiguration.DEFAULT_NM_OVERALLOCATION_PREEMPTION_THRESHOLD); + + this.overAllocationPreemptionThresholds = + ResourceThresholds.newInstance(preemptionThreshold); + } + } + private boolean isResourceCalculatorAvailable() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 6f5009eebf11f..82ce2b5a71a27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; @@ -695,6 +696,16 @@ public ConcurrentLinkedQueue getLogAggregationStatusForApp return null; } + @Override + public boolean isOverAllocationEnabled() { + return false; + } + + @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + @Override public NodeResourceMonitor getNodeResourceMonitor() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java index 674cd7142b8fe..006b0601edbbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java @@ -21,8 +21,10 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; @@ -294,4 +296,25 @@ public void testTeardown() throws Exception { public void testStrictResourceUsage() throws Exception { Assert.assertNull(cGroupsCpuResourceHandler.teardown()); } + + @Test + public void testOpportunistic() throws Exception { + Configuration conf = new YarnConfiguration(); + + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + ContainerTokenIdentifier tokenId = mock(ContainerTokenIdentifier.class); + when(tokenId.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); + Container container = mock(Container.class); + String id = "container_01_01"; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + when(container.getContainerId()).thenReturn(mockContainerId); + when(container.getContainerTokenIdentifier()).thenReturn(tokenId); + when(container.getResource()).thenReturn(Resource.newInstance(1024, 2)); + cGroupsCpuResourceHandler.preStart(container); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_CPU_SHARES, "2"); + } + }