Skip to content

Commit ba2313d

Browse files
committed
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend container allocation logic. Contributed by Wangda Tan
1 parent f271d37 commit ba2313d

File tree

11 files changed

+1011
-692
lines changed

11 files changed

+1011
-692
lines changed

hadoop-yarn-project/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,9 @@ Release 2.8.0 - UNRELEASED
391391
YARN-2768. Avoid cloning Resource in FSAppAttempt#updateDemand.
392392
(Hong Zhiguo via kasha)
393393

394+
YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
395+
container allocation logic. (Wangda Tan via jianhe)
396+
394397
BUG FIXES
395398

396399
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,7 @@
5353
import com.google.common.collect.Sets;
5454

5555
public abstract class AbstractCSQueue implements CSQueue {
56-
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
57-
58-
static final CSAssignment NULL_ASSIGNMENT =
59-
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
60-
61-
static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
62-
56+
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
6357
CSQueue parent;
6458
final String queueName;
6559
volatile int numContainers;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@
2424
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
2525
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
2626
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
27+
import org.apache.hadoop.yarn.util.resource.Resources;
2728

2829
@Private
2930
@Unstable
3031
public class CSAssignment {
32+
public static final CSAssignment NULL_ASSIGNMENT =
33+
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
3134

32-
final private Resource resource;
35+
public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
36+
37+
private Resource resource;
3338
private NodeType type;
3439
private RMContainer excessReservation;
3540
private FiCaSchedulerApp application;
@@ -67,6 +72,10 @@ public CSAssignment(Resource resource, NodeType type,
6772
public Resource getResource() {
6873
return resource;
6974
}
75+
76+
public void setResource(Resource resource) {
77+
this.resource = resource;
78+
}
7079

7180
public NodeType getType() {
7281
return type;

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
777777
// if our queue cannot access this node, just return
778778
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
779779
&& !accessibleToPartition(node.getPartition())) {
780-
return NULL_ASSIGNMENT;
780+
return CSAssignment.NULL_ASSIGNMENT;
781781
}
782782

783783
// Check if this queue need more resource, simply skip allocation if this
@@ -789,7 +789,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
789789
+ ", because it doesn't need more resource, schedulingMode="
790790
+ schedulingMode.name() + " node-partition=" + node.getPartition());
791791
}
792-
return NULL_ASSIGNMENT;
792+
return CSAssignment.NULL_ASSIGNMENT;
793793
}
794794

795795
for (Iterator<FiCaSchedulerApp> assignmentIterator =
@@ -800,7 +800,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
800800
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
801801
currentResourceLimits, application.getCurrentReservation(),
802802
schedulingMode)) {
803-
return NULL_ASSIGNMENT;
803+
return CSAssignment.NULL_ASSIGNMENT;
804804
}
805805

806806
Resource userLimit =
@@ -846,11 +846,11 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
846846
} else if (!assignment.getSkipped()) {
847847
// If we don't allocate anything, and it is not skipped by application,
848848
// we will return to respect FIFO of applications
849-
return NULL_ASSIGNMENT;
849+
return CSAssignment.NULL_ASSIGNMENT;
850850
}
851851
}
852852

853-
return NULL_ASSIGNMENT;
853+
return CSAssignment.NULL_ASSIGNMENT;
854854
}
855855

856856
protected Resource getHeadroom(User user, Resource queueCurrentLimit,

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
384384
// if our queue cannot access this node, just return
385385
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
386386
&& !accessibleToPartition(node.getPartition())) {
387-
return NULL_ASSIGNMENT;
387+
return CSAssignment.NULL_ASSIGNMENT;
388388
}
389389

390390
// Check if this queue need more resource, simply skip allocation if this
@@ -396,7 +396,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
396396
+ ", because it doesn't need more resource, schedulingMode="
397397
+ schedulingMode.name() + " node-partition=" + node.getPartition());
398398
}
399-
return NULL_ASSIGNMENT;
399+
return CSAssignment.NULL_ASSIGNMENT;
400400
}
401401

402402
CSAssignment assignment =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
20+
21+
public enum AllocationState {
22+
APP_SKIPPED,
23+
PRIORITY_SKIPPED,
24+
LOCALITY_SKIPPED,
25+
QUEUE_SKIPPED,
26+
ALLOCATED,
27+
RESERVED
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
20+
21+
import org.apache.hadoop.yarn.api.records.Container;
22+
import org.apache.hadoop.yarn.api.records.Resource;
23+
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
24+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
25+
import org.apache.hadoop.yarn.util.resource.Resources;
26+
27+
public class ContainerAllocation {
28+
public static final ContainerAllocation PRIORITY_SKIPPED =
29+
new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
30+
31+
public static final ContainerAllocation APP_SKIPPED =
32+
new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
33+
34+
public static final ContainerAllocation QUEUE_SKIPPED =
35+
new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
36+
37+
public static final ContainerAllocation LOCALITY_SKIPPED =
38+
new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
39+
40+
RMContainer containerToBeUnreserved;
41+
private Resource resourceToBeAllocated = Resources.none();
42+
AllocationState state;
43+
NodeType containerNodeType = NodeType.NODE_LOCAL;
44+
NodeType requestNodeType = NodeType.NODE_LOCAL;
45+
Container updatedContainer;
46+
47+
public ContainerAllocation(RMContainer containerToBeUnreserved,
48+
Resource resourceToBeAllocated, AllocationState state) {
49+
this.containerToBeUnreserved = containerToBeUnreserved;
50+
this.resourceToBeAllocated = resourceToBeAllocated;
51+
this.state = state;
52+
}
53+
54+
public RMContainer getContainerToBeUnreserved() {
55+
return containerToBeUnreserved;
56+
}
57+
58+
public Resource getResourceToBeAllocated() {
59+
if (resourceToBeAllocated == null) {
60+
return Resources.none();
61+
}
62+
return resourceToBeAllocated;
63+
}
64+
65+
public AllocationState getAllocationState() {
66+
return state;
67+
}
68+
69+
public NodeType getContainerNodeType() {
70+
return containerNodeType;
71+
}
72+
73+
public Container getUpdatedContainer() {
74+
return updatedContainer;
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
20+
21+
import org.apache.hadoop.yarn.api.records.Priority;
22+
import org.apache.hadoop.yarn.api.records.Resource;
23+
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
24+
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
25+
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
26+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
27+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
28+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
29+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
30+
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
31+
import org.apache.hadoop.yarn.util.resource.Resources;
32+
33+
/**
34+
* For an application, resource limits and resource requests, decide how to
35+
* allocate container. This is to make application resource allocation logic
36+
* extensible.
37+
*/
38+
public abstract class ContainerAllocator {
39+
FiCaSchedulerApp application;
40+
final ResourceCalculator rc;
41+
final RMContext rmContext;
42+
43+
public ContainerAllocator(FiCaSchedulerApp application,
44+
ResourceCalculator rc, RMContext rmContext) {
45+
this.application = application;
46+
this.rc = rc;
47+
this.rmContext = rmContext;
48+
}
49+
50+
/**
51+
* preAllocation is to perform checks, etc. to see if we can/cannot allocate
52+
* container. It will put necessary information to returned
53+
* {@link ContainerAllocation}.
54+
*/
55+
abstract ContainerAllocation preAllocation(
56+
Resource clusterResource, FiCaSchedulerNode node,
57+
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
58+
Priority priority, RMContainer reservedContainer);
59+
60+
/**
61+
* doAllocation is to update application metrics, create containers, etc.
62+
* According to allocating conclusion decided by preAllocation.
63+
*/
64+
abstract ContainerAllocation doAllocation(
65+
ContainerAllocation allocationResult, Resource clusterResource,
66+
FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
67+
RMContainer reservedContainer);
68+
69+
boolean checkHeadroom(Resource clusterResource,
70+
ResourceLimits currentResourceLimits, Resource required,
71+
FiCaSchedulerNode node) {
72+
// If headroom + currentReservation < required, we cannot allocate this
73+
// require
74+
Resource resourceCouldBeUnReserved = application.getCurrentReservation();
75+
if (!application.getCSLeafQueue().getReservationContinueLooking()
76+
|| !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
77+
// If we don't allow reservation continuous looking, OR we're looking at
78+
// non-default node partition, we won't allow to unreserve before
79+
// allocation.
80+
resourceCouldBeUnReserved = Resources.none();
81+
}
82+
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
83+
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
84+
required);
85+
}
86+
87+
/**
88+
* allocate needs to handle following stuffs:
89+
*
90+
* <ul>
91+
* <li>Select request: Select a request to allocate. E.g. select a resource
92+
* request based on requirement/priority/locality.</li>
93+
* <li>Check if a given resource can be allocated based on resource
94+
* availability</li>
95+
* <li>Do allocation: this will decide/create allocated/reserved
96+
* container, this will also update metrics</li>
97+
* </ul>
98+
*/
99+
public ContainerAllocation allocate(Resource clusterResource,
100+
FiCaSchedulerNode node, SchedulingMode schedulingMode,
101+
ResourceLimits resourceLimits, Priority priority,
102+
RMContainer reservedContainer) {
103+
ContainerAllocation result =
104+
preAllocation(clusterResource, node, schedulingMode,
105+
resourceLimits, priority, reservedContainer);
106+
107+
if (AllocationState.ALLOCATED == result.state
108+
|| AllocationState.RESERVED == result.state) {
109+
result = doAllocation(result, clusterResource, node,
110+
schedulingMode, priority, reservedContainer);
111+
}
112+
113+
return result;
114+
}
115+
}

0 commit comments

Comments
 (0)