diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 67103d187b3fe..5a462acee6ff4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -575,8 +576,15 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { ResourceRequest rackLocalRequest = getResourceRequest(priority, node.getRackName()); + if (rackLocalRequest != null && !getQueue().checkQueueResourceLimit(rackLocalRequest.getCapability()) ) { + rackLocalRequest = null; + } + ResourceRequest localRequest = getResourceRequest(priority, node.getNodeName()); + if (localRequest != null && !getQueue().checkQueueResourceLimit(localRequest.getCapability()) ) { + localRequest = null; + } if (localRequest != null && !localRequest.getRelaxLocality()) { LOG.warn("Relax locality off is not supported on local request: " @@ -613,8 +621,12 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { NodeType.RACK_LOCAL, reserved); } - ResourceRequest offSwitchRequest = - getResourceRequest(priority, ResourceRequest.ANY); + ResourceRequest offSwitchRequest = null; + Map resourceRequestList = getResourceRequests(priority); + if(resourceRequestList != null) { + offSwitchRequest = getRequestByQResourceLimit(resourceRequestList); + } + if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { continue; } @@ -632,6 +644,26 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { return Resources.none(); } + /** + * location: resource requests in a application -> a priority;
+ * choose a resource request depending on the queue's resource limit check + * @param resourceRequestList + * @return + */ + @SuppressWarnings("rawtypes") + private ResourceRequest getRequestByQResourceLimit( + Map resourceRequestList) { + Iterator iter = resourceRequestList.entrySet().iterator(); + while( iter.hasNext() ) { + Map.Entry entry = (Map.Entry)iter.next(); + ResourceRequest rRequest = (ResourceRequest) entry.getValue(); + if ( getQueue().checkQueueResourceLimit(rRequest.getCapability()) ) { + return rRequest; + } + } + return null; + } + /** * Called when this application already has an existing reservation on the * given node. Sees whether we can turn the reservation into an allocation. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 349464e1ef2f3..731235457e87a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -265,6 +265,21 @@ protected boolean assignContainerPreCheck(FSSchedulerNode node) { } return true; } + + /** + * helper method to check if this resource request should be assigned
+ * this check based on a equation: resourceRequest + resourceUsed should be less than resourceMaxLimit + * @param resource + * @return + */ + public boolean checkQueueResourceLimit(Resource resource) { + Resource resourceWillUse = Resources.add(resource, getResourceUsage()); + if ( !Resources.fitsIn( resourceWillUse, + scheduler.getAllocationConfiguration().getMaxResources(getName())) ) { + return false; + } + return true; + } /** * Returns true if queue has at least one app running.