| 
22 | 22 | import java.util.ArrayList;  | 
23 | 23 | import java.util.Arrays;  | 
24 | 24 | import java.util.Collection;  | 
 | 25 | +import java.util.Collections;  | 
25 | 26 | import java.util.EnumSet;  | 
 | 27 | + | 
 | 28 | +import java.util.HashMap;  | 
26 | 29 | import java.util.List;  | 
27 | 30 | import java.util.Map;  | 
28 | 31 | import java.util.Set;  | 
 | 
33 | 36 | 
 
  | 
34 | 37 | import com.google.gson.Gson;  | 
35 | 38 | import com.google.gson.reflect.TypeToken;  | 
 | 39 | + | 
 | 40 | +import org.apache.commons.lang3.builder.EqualsBuilder;  | 
 | 41 | +import org.apache.commons.lang3.builder.HashCodeBuilder;  | 
 | 42 | +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;  | 
 | 43 | +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;  | 
36 | 44 | import org.slf4j.Logger;  | 
37 | 45 | import org.slf4j.LoggerFactory;  | 
38 | 46 | import org.apache.hadoop.classification.InterfaceAudience.Private;  | 
@@ -142,6 +150,7 @@ public abstract class AbstractYarnScheduler  | 
142 | 150 |   Thread updateThread;  | 
143 | 151 |   private final Object updateThreadMonitor = new Object();  | 
144 | 152 |   private Timer releaseCache;  | 
 | 153 | +  private boolean autoCorrectContainerAllocation;  | 
145 | 154 | 
 
  | 
146 | 155 |   /*  | 
147 | 156 |    * All schedulers which are inheriting AbstractYarnScheduler should use  | 
@@ -196,6 +205,10 @@ public void serviceInit(Configuration conf) throws Exception {  | 
196 | 205 |     nmHeartbeatInterval =  | 
197 | 206 |         conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,  | 
198 | 207 |             YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);  | 
 | 208 | +    skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);  | 
 | 209 | +    autoCorrectContainerAllocation =  | 
 | 210 | +        conf.getBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION,  | 
 | 211 | +            YarnConfiguration.DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION);  | 
199 | 212 |     long configuredMaximumAllocationWaitTime =  | 
200 | 213 |         conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,  | 
201 | 214 |           YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);  | 
@@ -589,6 +602,106 @@ public void recoverContainersOnNode(List<NMContainerStatus> containerReports,  | 
589 | 602 |     }  | 
590 | 603 |   }  | 
591 | 604 | 
 
  | 
 | 605 | +  /**  | 
 | 606 | +   * Autocorrect container resourceRequests by decrementing the number of newly allocated containers  | 
 | 607 | +   * from the current container request. This also updates the newlyAllocatedContainers to be within  | 
 | 608 | +   * the limits of the current container resourceRequests.  | 
 | 609 | +   * ResourceRequests locality/resourceName is not considered while autocorrecting the container  | 
 | 610 | +   * request, hence when there are two types of resourceRequest which is same except for the  | 
 | 611 | +   * locality/resourceName, it is counted as same {@link ContainerObjectType} and the container  | 
 | 612 | +   * ask and number of newly allocated container is decremented accordingly.  | 
 | 613 | +   * For example when a client requests for 4 containers with locality/resourceName  | 
 | 614 | +   * as "node1", AMRMClientaugments the resourceRequest into two  | 
 | 615 | +   * where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1),  | 
 | 616 | +   * if Yarn allocated 6 containers previously, it will release 2 containers as well as  | 
 | 617 | +   * update the container ask to 0.  | 
 | 618 | +   *  | 
 | 619 | +   * If there is a client which directly calls Yarn (without AMRMClient) with  | 
 | 620 | +   * two where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1)  | 
 | 621 | +   * the autocorrection may not work as expected. The use case of such client is very rare.  | 
 | 622 | +   *  | 
 | 623 | +   * <p>  | 
 | 624 | +   * This method is called from {@link AbstractYarnScheduler#allocate} method. It is package private  | 
 | 625 | +   * to be used within the scheduler package only.  | 
 | 626 | +   * @param resourceRequests List of resources to be allocated  | 
 | 627 | +   * @param application ApplicationAttempt  | 
 | 628 | +   */  | 
 | 629 | +  @VisibleForTesting  | 
 | 630 | +  protected void autoCorrectContainerAllocation(List<ResourceRequest> resourceRequests,  | 
 | 631 | +      SchedulerApplicationAttempt application) {  | 
 | 632 | + | 
 | 633 | +    // if there is no resourceRequests for containers or no newly allocated container from  | 
 | 634 | +    // the previous request there is nothing to do.  | 
 | 635 | +    if (!autoCorrectContainerAllocation || resourceRequests.isEmpty() ||  | 
 | 636 | +        application.newlyAllocatedContainers.isEmpty()) {  | 
 | 637 | +      return;  | 
 | 638 | +    }  | 
 | 639 | + | 
 | 640 | +    // iterate newlyAllocatedContainers and form a mapping of container type  | 
 | 641 | +    // and number of its occurrence.  | 
 | 642 | +    Map<ContainerObjectType, List<RMContainer>> allocatedContainerMap = new HashMap<>();  | 
 | 643 | +    for (RMContainer rmContainer : application.newlyAllocatedContainers) {  | 
 | 644 | +      Container container = rmContainer.getContainer();  | 
 | 645 | +      ContainerObjectType containerObjectType = new ContainerObjectType(  | 
 | 646 | +          container.getAllocationRequestId(), container.getPriority(),  | 
 | 647 | +          container.getExecutionType(), container.getResource());  | 
 | 648 | +      allocatedContainerMap.computeIfAbsent(containerObjectType,  | 
 | 649 | +          k -> new ArrayList<>()).add(rmContainer);  | 
 | 650 | +    }  | 
 | 651 | + | 
 | 652 | +    Map<ContainerObjectType, Integer> extraContainerAllocatedMap = new HashMap<>();  | 
 | 653 | +    // iterate through resourceRequests and update the request by  | 
 | 654 | +    // decrementing the already allocated containers.  | 
 | 655 | +    for (ResourceRequest request : resourceRequests) {  | 
 | 656 | +      ContainerObjectType containerObjectType =  | 
 | 657 | +          new ContainerObjectType(request.getAllocationRequestId(),  | 
 | 658 | +              request.getPriority(), request.getExecutionTypeRequest().getExecutionType(),  | 
 | 659 | +              request.getCapability());  | 
 | 660 | +      int numContainerAllocated = allocatedContainerMap.getOrDefault(containerObjectType,  | 
 | 661 | +          Collections.emptyList()).size();  | 
 | 662 | +      if (numContainerAllocated > 0) {  | 
 | 663 | +        int numContainerAsk = request.getNumContainers();  | 
 | 664 | +        int updatedContainerRequest = numContainerAsk - numContainerAllocated;  | 
 | 665 | +        if (updatedContainerRequest < 0) {  | 
 | 666 | +          // add an entry to extra allocated map  | 
 | 667 | +          extraContainerAllocatedMap.put(containerObjectType, Math.abs(updatedContainerRequest));  | 
 | 668 | +          LOG.debug("{} container of the resource type: {} will be released",  | 
 | 669 | +              Math.abs(updatedContainerRequest), request);  | 
 | 670 | +          // if newlyAllocatedContainer count is more than the current container  | 
 | 671 | +          // resourceRequests, reset it to 0.  | 
 | 672 | +          updatedContainerRequest = 0;  | 
 | 673 | +        }  | 
 | 674 | + | 
 | 675 | +        // update the request  | 
 | 676 | +        LOG.debug("Updating container resourceRequests from {} to {} for the resource type: {}",  | 
 | 677 | +            numContainerAsk, updatedContainerRequest, request);  | 
 | 678 | +        request.setNumContainers(updatedContainerRequest);  | 
 | 679 | +      }  | 
 | 680 | +    }  | 
 | 681 | + | 
 | 682 | +    // Iterate over the entries in extraContainerAllocatedMap  | 
 | 683 | +    for (Map.Entry<ContainerObjectType, Integer> entry : extraContainerAllocatedMap.entrySet()) {  | 
 | 684 | +      ContainerObjectType containerObjectType = entry.getKey();  | 
 | 685 | +      int extraContainers = entry.getValue();  | 
 | 686 | + | 
 | 687 | +      // Get the list of allocated containers for the current ContainerObjectType  | 
 | 688 | +      List<RMContainer> allocatedContainers = allocatedContainerMap.get(containerObjectType);  | 
 | 689 | +      if (allocatedContainers != null) {  | 
 | 690 | +        for (RMContainer rmContainer : allocatedContainers) {  | 
 | 691 | +          if (extraContainers > 0) {  | 
 | 692 | +            // Change the state of the container from ALLOCATED to EXPIRED since it is not required.  | 
 | 693 | +            LOG.debug("Removing extra container:{}", rmContainer.getContainer());  | 
 | 694 | +            completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus(  | 
 | 695 | +                rmContainer.getContainerId(), SchedulerUtils.EXPIRED_CONTAINER),  | 
 | 696 | +                RMContainerEventType.EXPIRE);  | 
 | 697 | +            application.newlyAllocatedContainers.remove(rmContainer);  | 
 | 698 | +            extraContainers--;  | 
 | 699 | +          }  | 
 | 700 | +        }  | 
 | 701 | +      }  | 
 | 702 | +    }  | 
 | 703 | +  }  | 
 | 704 | + | 
592 | 705 |   private RMContainer recoverAndCreateContainer(NMContainerStatus status,  | 
593 | 706 |       RMNode node, String queueName) {  | 
594 | 707 |     Container container =  | 
@@ -623,6 +736,14 @@ private void recoverResourceRequestForContainer(RMContainer rmContainer) {  | 
623 | 736 |       return;  | 
624 | 737 |     }  | 
625 | 738 | 
 
  | 
 | 739 | +    // when auto correct container allocation is enabled, there can be a case when extra containers  | 
 | 740 | +    // go to expired state from allocated state. When such scenario happens do not re-attempt the  | 
 | 741 | +    // container request since this is expected.  | 
 | 742 | +    if (autoCorrectContainerAllocation &&  | 
 | 743 | +        RMContainerState.EXPIRED.equals(rmContainer.getState())) {  | 
 | 744 | +      return;  | 
 | 745 | +    }  | 
 | 746 | + | 
626 | 747 |     // Add resource request back to Scheduler ApplicationAttempt.  | 
627 | 748 | 
 
  | 
628 | 749 |     // We lookup the application-attempt here again using  | 
@@ -1514,4 +1635,101 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,  | 
1514 | 1635 |   public void resetSchedulerMetrics() {  | 
1515 | 1636 |     // reset scheduler metrics  | 
1516 | 1637 |   }  | 
 | 1638 | + | 
 | 1639 | +  /**  | 
 | 1640 | +   * Gets the apps from a given queue.  | 
 | 1641 | +   *  | 
 | 1642 | +   * Mechanics:  | 
 | 1643 | +   * 1. Get all {@link ApplicationAttemptId}s in the given queue by  | 
 | 1644 | +   * {@link #getAppsInQueue(String)} method.  | 
 | 1645 | +   * 2. Always need to check validity for the given queue by the returned  | 
 | 1646 | +   * values.  | 
 | 1647 | +   *  | 
 | 1648 | +   * @param queueName queue name  | 
 | 1649 | +   * @return a collection of app attempt ids in the given queue, it maybe empty.  | 
 | 1650 | +   * @throws YarnException if {@link #getAppsInQueue(String)} return null, will  | 
 | 1651 | +   * throw this exception.  | 
 | 1652 | +   */  | 
 | 1653 | +  private List<ApplicationAttemptId> getAppsFromQueue(String queueName)  | 
 | 1654 | +      throws YarnException {  | 
 | 1655 | +    List<ApplicationAttemptId> apps = getAppsInQueue(queueName);  | 
 | 1656 | +    if (apps == null) {  | 
 | 1657 | +      throw new YarnException("The specified queue: " + queueName  | 
 | 1658 | +          + " doesn't exist");  | 
 | 1659 | +    }  | 
 | 1660 | +    return apps;  | 
 | 1661 | +  }  | 
 | 1662 | + | 
 | 1663 | +  /**  | 
 | 1664 | +   * ContainerObjectType is a container object with the following properties.  | 
 | 1665 | +   * Namely allocationId, priority, executionType and resourceType.  | 
 | 1666 | +   */  | 
 | 1667 | +  protected class ContainerObjectType extends Object {  | 
 | 1668 | +    private final long allocationId;  | 
 | 1669 | +    private final Priority priority;  | 
 | 1670 | +    private final ExecutionType executionType;  | 
 | 1671 | +    private final Resource resource;  | 
 | 1672 | + | 
 | 1673 | +    public ContainerObjectType(long allocationId, Priority priority,  | 
 | 1674 | +        ExecutionType executionType, Resource resource) {  | 
 | 1675 | +      this.allocationId = allocationId;  | 
 | 1676 | +      this.priority = priority;  | 
 | 1677 | +      this.executionType = executionType;  | 
 | 1678 | +      this.resource = resource;  | 
 | 1679 | +    }  | 
 | 1680 | + | 
 | 1681 | +    public long getAllocationId() {  | 
 | 1682 | +      return allocationId;  | 
 | 1683 | +    }  | 
 | 1684 | + | 
 | 1685 | +    public Priority getPriority() {  | 
 | 1686 | +      return priority;  | 
 | 1687 | +    }  | 
 | 1688 | + | 
 | 1689 | +    public ExecutionType getExecutionType() {  | 
 | 1690 | +      return executionType;  | 
 | 1691 | +    }  | 
 | 1692 | + | 
 | 1693 | +    public Resource getResource() {  | 
 | 1694 | +      return resource;  | 
 | 1695 | +    }  | 
 | 1696 | + | 
 | 1697 | +    @Override  | 
 | 1698 | +    public int hashCode() {  | 
 | 1699 | +      return new HashCodeBuilder(17, 37)  | 
 | 1700 | +          .append(allocationId)  | 
 | 1701 | +          .append(priority)  | 
 | 1702 | +          .append(executionType)  | 
 | 1703 | +          .append(resource)  | 
 | 1704 | +          .toHashCode();  | 
 | 1705 | +    }  | 
 | 1706 | + | 
 | 1707 | +    @Override  | 
 | 1708 | +    public boolean equals(Object obj) {  | 
 | 1709 | +      if (obj == null) {  | 
 | 1710 | +        return false;  | 
 | 1711 | +      }  | 
 | 1712 | +      if (obj.getClass() != this.getClass()) {  | 
 | 1713 | +        return false;  | 
 | 1714 | +      }  | 
 | 1715 | + | 
 | 1716 | +      ContainerObjectType other = (ContainerObjectType) obj;  | 
 | 1717 | +      return new EqualsBuilder()  | 
 | 1718 | +          .append(allocationId, other.getAllocationId())  | 
 | 1719 | +          .append(priority, other.getPriority())  | 
 | 1720 | +          .append(executionType, other.getExecutionType())  | 
 | 1721 | +          .append(resource, other.getResource())  | 
 | 1722 | +          .isEquals();  | 
 | 1723 | +    }  | 
 | 1724 | + | 
 | 1725 | +    @Override  | 
 | 1726 | +    public String toString() {  | 
 | 1727 | +      return "{ContainerObjectType: "  | 
 | 1728 | +          + ", Priority: " + getPriority()  | 
 | 1729 | +          + ", Allocation Id: " + getAllocationId()  | 
 | 1730 | +          + ", Execution Type: " + getExecutionType()  | 
 | 1731 | +          + ", Resource: " + getResource()  | 
 | 1732 | +          + "}";  | 
 | 1733 | +    }  | 
 | 1734 | +  }  | 
1517 | 1735 | }  | 
0 commit comments