Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Iterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -575,8 +576,15 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {

ResourceRequest rackLocalRequest = getResourceRequest(priority,
node.getRackName());
if (rackLocalRequest != null && !getQueue().checkQueueResourceLimit(rackLocalRequest.getCapability()) ) {
rackLocalRequest = null;
}

ResourceRequest localRequest = getResourceRequest(priority,
node.getNodeName());
if (localRequest != null && !getQueue().checkQueueResourceLimit(localRequest.getCapability()) ) {
localRequest = null;
}

if (localRequest != null && !localRequest.getRelaxLocality()) {
LOG.warn("Relax locality off is not supported on local request: "
Expand Down Expand Up @@ -613,8 +621,12 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
NodeType.RACK_LOCAL, reserved);
}

ResourceRequest offSwitchRequest =
getResourceRequest(priority, ResourceRequest.ANY);
ResourceRequest offSwitchRequest = null;
Map<String, ResourceRequest> resourceRequestList = getResourceRequests(priority);
if(resourceRequestList != null) {
offSwitchRequest = getRequestByQResourceLimit(resourceRequestList);
}

if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
continue;
}
Expand All @@ -632,6 +644,26 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
return Resources.none();
}

/**
* location: resource requests in a application -> a priority; <br/>
* choose a resource request depending on the queue's resource limit check
* @param resourceRequestList
* @return
*/
@SuppressWarnings("rawtypes")
private ResourceRequest getRequestByQResourceLimit(
Map<String, ResourceRequest> resourceRequestList) {
Iterator iter = resourceRequestList.entrySet().iterator();
while( iter.hasNext() ) {
Map.Entry entry = (Map.Entry)iter.next();
ResourceRequest rRequest = (ResourceRequest) entry.getValue();
if ( getQueue().checkQueueResourceLimit(rRequest.getCapability()) ) {
return rRequest;
}
}
return null;
}

/**
* Called when this application already has an existing reservation on the
* given node. Sees whether we can turn the reservation into an allocation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,21 @@ protected boolean assignContainerPreCheck(FSSchedulerNode node) {
}
return true;
}

/**
* helper method to check if this resource request should be assigned <br/>
* this check based on a equation: resourceRequest + resourceUsed should be less than resourceMaxLimit
* @param resource
* @return
*/
public boolean checkQueueResourceLimit(Resource resource) {
Resource resourceWillUse = Resources.add(resource, getResourceUsage());
if ( !Resources.fitsIn( resourceWillUse,
scheduler.getAllocationConfiguration().getMaxResources(getName())) ) {
return false;
}
return true;
}

/**
* Returns true if queue has at least one app running.
Expand Down