Skip to content

Commit 7c4de59

Browse files
author
Tao Yang
committed
YARN-10293. Reserved Containers not allocated from available space of other nodes in CandidateNodeSet for MultiNodePlacement. Contributed by Prabhu Joseph.
1 parent fed6fec commit 7c4de59

File tree

3 files changed

+276
-30
lines changed

3 files changed

+276
-30
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,31 +1721,13 @@ private CSAssignment allocateOrReserveNewContainers(
17211721
*/
17221722
private CSAssignment allocateContainersOnMultiNodes(
17231723
CandidateNodeSet<FiCaSchedulerNode> candidates) {
1724-
// When this time look at multiple nodes, try schedule if the
1725-
// partition has any available resource or killable resource
1726-
if (getRootQueue().getQueueCapacities().getUsedCapacity(
1727-
candidates.getPartition()) >= 1.0f
1728-
&& preemptionManager.getKillableResource(
1729-
CapacitySchedulerConfiguration.ROOT, candidates.getPartition())
1730-
== Resources.none()) {
1731-
// Try to allocate from reserved containers
1732-
for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
1733-
RMContainer reservedContainer = node.getReservedContainer();
1734-
if (reservedContainer != null) {
1735-
allocateFromReservedContainer(node, false, reservedContainer);
1736-
}
1724+
// Try to allocate from reserved containers
1725+
for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
1726+
RMContainer reservedContainer = node.getReservedContainer();
1727+
if (reservedContainer != null) {
1728+
allocateFromReservedContainer(node, false, reservedContainer);
17371729
}
1738-
LOG.debug("This partition '{}' doesn't have available or "
1739-
+ "killable resource", candidates.getPartition());
1740-
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, null,
1741-
"", getRootQueue().getQueuePath(), ActivityState.REJECTED,
1742-
ActivityDiagnosticConstant.
1743-
INIT_CHECK_PARTITION_RESOURCE_INSUFFICIENT);
1744-
ActivitiesLogger.NODE
1745-
.finishSkippedNodeAllocation(activitiesManager, null);
1746-
return null;
17471730
}
1748-
17491731
return allocateOrReserveNewContainers(candidates, false);
17501732
}
17511733

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,6 @@ public void testExcessReservationWillBeUnreserved() throws Exception {
258258
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
259259
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
260260
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
261-
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
262-
263-
// Trigger scheduling to allocate a container on nm1 for app2.
264-
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
265-
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
266-
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
267-
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
268261
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
269262
Assert.assertEquals(7 * GB,
270263
cs.getNode(nm1.getNodeId()).getAllocatedResource().getMemorySize());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
20+
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertNotEquals;
23+
24+
import org.apache.commons.logging.Log;
25+
import org.apache.commons.logging.LogFactory;
26+
import org.apache.hadoop.test.GenericTestUtils;
27+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
28+
import org.apache.hadoop.yarn.api.records.Resource;
29+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
30+
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
31+
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
32+
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
33+
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
34+
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
35+
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
36+
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
37+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
38+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
39+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
40+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
41+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
42+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
43+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter;
44+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
45+
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
46+
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
47+
import org.apache.hadoop.yarn.util.resource.Resources;
48+
import org.junit.Assert;
49+
import org.junit.Before;
50+
import org.junit.Test;
51+
52+
import java.util.concurrent.atomic.AtomicBoolean;
53+
import java.util.concurrent.atomic.AtomicReference;
54+
55+
public class TestCapacitySchedulerMultiNodesWithPreemption
56+
extends CapacitySchedulerTestBase {
57+
58+
private static final Log LOG = LogFactory
59+
.getLog(TestCapacitySchedulerMultiNodesWithPreemption.class);
60+
private CapacitySchedulerConfiguration conf;
61+
private static final String POLICY_CLASS_NAME =
62+
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement."
63+
+ "ResourceUsageMultiNodeLookupPolicy";
64+
65+
@Before
66+
public void setUp() {
67+
CapacitySchedulerConfiguration config =
68+
new CapacitySchedulerConfiguration();
69+
config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
70+
DominantResourceCalculator.class.getName());
71+
conf = new CapacitySchedulerConfiguration(config);
72+
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
73+
ResourceScheduler.class);
74+
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
75+
"resource-based");
76+
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
77+
"resource-based");
78+
String policyName =
79+
CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
80+
+ ".resource-based" + ".class";
81+
conf.set(policyName, POLICY_CLASS_NAME);
82+
conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
83+
true);
84+
// Set this to avoid the AM pending issue
85+
conf.set(CapacitySchedulerConfiguration
86+
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, "1");
87+
conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
88+
conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
89+
conf.setInt("yarn.scheduler.maximum-allocation-mb", 102400);
90+
91+
// Configure two queues to test Preemption
92+
conf.set("yarn.scheduler.capacity.root.queues", "A, default");
93+
conf.set("yarn.scheduler.capacity.root.A.capacity", "50");
94+
conf.set("yarn.scheduler.capacity.root.default.capacity", "50");
95+
conf.set("yarn.scheduler.capacity.root.A.maximum-capacity", "100");
96+
conf.set("yarn.scheduler.capacity.root.default.maximum-capacity", "100");
97+
conf.set("yarn.scheduler.capacity.root.A.user-limit-factor", "10");
98+
conf.set("yarn.scheduler.capacity.root.default.user-limit-factor", "10");
99+
100+
// Configure Preemption
101+
conf.setLong(
102+
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
103+
conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
104+
1500);
105+
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
106+
1.0f);
107+
conf.setFloat(
108+
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
109+
1.0f);
110+
111+
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
112+
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
113+
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
114+
}
115+
116+
@Test(timeout=60000)
117+
public void testAllocateReservationFromOtherNode() throws Exception {
118+
MockRM rm = new MockRM(conf);
119+
rm.start();
120+
MockNM[] nms = new MockNM[3];
121+
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 1 * GB, 2);
122+
nms[0] = nm1;
123+
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * GB, 2);
124+
nms[1] = nm2;
125+
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 3 * GB, 2);
126+
nms[2] = nm3;
127+
128+
MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
129+
.getMultiNodeSortingManager();
130+
MultiNodeSorter<SchedulerNode> sorter = mns
131+
.getMultiNodePolicy(POLICY_CLASS_NAME);
132+
sorter.reSortClusterNodes();
133+
134+
// Step 1: Launch an App in Default Queue which utilizes the entire cluster
135+
RMApp app1 = MockRMAppSubmitter.submit(rm,
136+
MockRMAppSubmissionData.Builder.createWithMemory(3 * GB, rm)
137+
.withAppName("app-1")
138+
.withUser("user1")
139+
.withAcls(null)
140+
.withQueue("default")
141+
.build());
142+
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
143+
am1.allocateAndWaitForContainers("*", 1, 2 * GB, nm2);
144+
am1.allocateAndWaitForContainers("*", 1, 1 * GB, nm3);
145+
146+
// Step 2: Wait till the nodes utilization are full
147+
GenericTestUtils.waitFor(() -> {
148+
SchedulerNodeReport reportNM1 =
149+
rm.getResourceScheduler().getNodeReport(nms[0].getNodeId());
150+
SchedulerNodeReport reportNM2 =
151+
rm.getResourceScheduler().getNodeReport(nms[1].getNodeId());
152+
return (reportNM1.getAvailableResource().getMemorySize() == 0 * GB)
153+
&& (reportNM2.getAvailableResource().getMemorySize() == 0 * GB);
154+
}, 10, 10000);
155+
156+
157+
// Step 3: Launch another App in Queue A which will be Reserved
158+
// after Preemption
159+
final AtomicBoolean result = new AtomicBoolean(false);
160+
RMApp app2 = MockRMAppSubmitter.submit(rm,
161+
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
162+
.withAppName("app-2")
163+
.withUser("user2")
164+
.withAcls(null)
165+
.withQueue("A")
166+
.build());
167+
168+
// Launch AM in a thread and in parallel free the preempted node's
169+
// unallocated resources in main thread
170+
Thread t1 = new Thread() {
171+
public void run() {
172+
try {
173+
MockAM am2 = MockRM.launchAM(app2, rm, nm1);
174+
result.set(true);
175+
} catch (Exception e) {
176+
Assert.fail("Failed to launch app-2");
177+
}
178+
}
179+
};
180+
t1.start();
181+
182+
// Step 4: Wait for Preemption to happen. It will preempt Node1 (1GB)
183+
// Get the node where preemption happened which has the available space
184+
final AtomicReference<MockNM> preemptedNode = new AtomicReference<>();
185+
GenericTestUtils.waitFor(() -> {
186+
for (int i = 0; i < nms.length; i++) {
187+
SchedulerNodeReport reportNM =
188+
rm.getResourceScheduler().getNodeReport(nms[i].getNodeId());
189+
if (reportNM.getAvailableResource().getMemorySize() == 1 * GB) {
190+
preemptedNode.set(nms[i]);
191+
return true;
192+
}
193+
}
194+
return false;
195+
}, 10, 30000);
196+
LOG.info("Preempted node is: " + preemptedNode.get().getNodeId());
197+
198+
199+
// Step 5: Don't release the container from NodeManager so that Reservation
200+
// happens. Used Capacity will be < 1.0f but nodes won't have available
201+
// containers so Reservation will happen.
202+
FiCaSchedulerNode schedulerNode =
203+
((CapacityScheduler) rm.getResourceScheduler())
204+
.getNodeTracker().getNode(preemptedNode.get().getNodeId());
205+
Resource curResource = schedulerNode.getUnallocatedResource();
206+
schedulerNode.deductUnallocatedResource(Resource.newInstance(curResource));
207+
208+
((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker()
209+
.removeNode(preemptedNode.get().getNodeId());
210+
((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker()
211+
.addNode(schedulerNode);
212+
213+
// Send a heartbeat to kick the tires on the Scheduler
214+
// The container will be reserved for app-2
215+
RMNode preemptedRMNode = rm.getRMContext().getRMNodes().get(
216+
preemptedNode.get().getNodeId());
217+
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(
218+
preemptedRMNode);
219+
rm.getResourceScheduler().handle(nodeUpdate);
220+
221+
// Validate if Reservation happened
222+
// Reservation will happen on last node in the iterator - Node3
223+
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
224+
ApplicationAttemptId app2AttemptId = app2.getCurrentAppAttempt()
225+
.getAppAttemptId();
226+
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(app2AttemptId);
227+
228+
assertEquals("App2 failed to get reserved container", 1,
229+
schedulerApp.getReservedContainers().size());
230+
LOG.info("Reserved node is: " +
231+
schedulerApp.getReservedContainers().get(0).getReservedNode());
232+
assertNotEquals("Failed to reserve as per the Multi Node Itearor",
233+
schedulerApp.getReservedContainers().get(0).getReservedNode(),
234+
preemptedNode.get().getNodeId());
235+
236+
237+
// Step 6: Okay, now preempted node is Node1 and reserved node is Node3
238+
// Validate if the Reserved Container gets allocated
239+
// after updating release container.
240+
schedulerNode = ((CapacityScheduler) rm.getResourceScheduler())
241+
.getNodeTracker().getNode(preemptedNode.get().getNodeId());
242+
curResource = schedulerNode.getAllocatedResource();
243+
schedulerNode.updateTotalResource(
244+
Resources.add(schedulerNode.getTotalResource(), curResource));
245+
246+
((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker()
247+
.removeNode(preemptedNode.get().getNodeId());
248+
((CapacityScheduler) rm.getResourceScheduler()).getNodeTracker()
249+
.addNode(schedulerNode);
250+
251+
preemptedRMNode = rm.getRMContext().getRMNodes().get(
252+
preemptedNode.get().getNodeId());
253+
nodeUpdate = new NodeUpdateSchedulerEvent(preemptedRMNode);
254+
rm.getResourceScheduler().handle(nodeUpdate);
255+
256+
// Step 7: Wait for app-2 to get ALLOCATED
257+
GenericTestUtils.waitFor(() -> {
258+
return result.get();
259+
}, 10, 20000);
260+
261+
// Step 8: Validate if app-2 has got 1 live container and
262+
// released the reserved container
263+
schedulerApp = cs.getApplicationAttempt(app2AttemptId);
264+
assertEquals("App2 failed to get Allocated", 1,
265+
schedulerApp.getLiveContainers().size());
266+
assertEquals("App2 failed to Unreserve", 0,
267+
schedulerApp.getReservedContainers().size());
268+
269+
rm.stop();
270+
}
271+
}

0 commit comments

Comments
 (0)