Skip to content

Commit 3d5ade1

Browse files
YARN-10043. FairOrderingPolicy Improvements. Contributed by Manikandan R
1 parent 9a297ff commit 3d5ade1

File tree

5 files changed

+181
-11
lines changed

5 files changed

+181
-11
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
8787
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
8888
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
89+
import org.apache.hadoop.yarn.util.SystemClock;
8990
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
9091
import org.apache.hadoop.yarn.util.resource.Resources;
9192

@@ -209,6 +210,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
209210

210211
private String nodeLabelExpression;
211212

213+
private final long startTime;
214+
212215
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
213216
String user, Queue queue, AbstractUsersManager abstractUsersManager,
214217
RMContext rmContext) {
@@ -242,6 +245,7 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
242245
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
243246
readLock = lock.readLock();
244247
writeLock = lock.writeLock();
248+
startTime = SystemClock.getInstance().getTime();
245249
}
246250

247251
public void setOpportunisticContainerContext(
@@ -1487,4 +1491,10 @@ public Map<String, String> getApplicationSchedulingEnvs() {
14871491
public String getPartition() {
14881492
return nodeLabelExpression == null ? "" : nodeLabelExpression;
14891493
}
1494+
1495+
1496+
@Override
1497+
public long getStartTime() {
1498+
return startTime;
1499+
}
14901500
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,21 @@
2828
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
2929

3030
/**
31-
* An OrderingPolicy which orders SchedulableEntities for fairness (see
32-
* FairScheduler
33-
* FairSharePolicy), generally, processes with lesser usage are lesser. If
34-
* sizedBasedWeight is set to true then an application with high demand
35-
* may be prioritized ahead of an application with less usage. This
36-
* is to offset the tendency to favor small apps, which could result in
37-
* starvation for large apps if many small ones enter and leave the queue
38-
* continuously (optional, default false)
31+
*
32+
* FairOrderingPolicy comparison goes through following steps.
33+
*
34+
* 1.Fairness based comparison. SchedulableEntities with lesser usage would be
35+
* preferred when compared to another. If sizedBasedWeight is set to true then
36+
* an application with high demand may be prioritized ahead of an application
37+
* with less usage. This is to offset the tendency to favor small apps, which
38+
* could result in starvation for large apps if many small ones enter and leave
39+
* the queue continuously (optional, default false)
40+
*
41+
* 2. Compare using job submit time. SchedulableEntities submitted earlier would
42+
* be preferred than later.
43+
*
44+
* 3. Compare demands. SchedulableEntities without resource demand get lower
45+
* priority than ones which have demands.
3946
*/
4047
public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractComparatorOrderingPolicy<S> {
4148

@@ -46,6 +53,30 @@ protected class FairComparator implements Comparator<SchedulableEntity> {
4653
@Override
4754
public int compare(final SchedulableEntity r1, final SchedulableEntity r2) {
4855
int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) );
56+
57+
if (res == 0) {
58+
res = (int) Math.signum(r1.getStartTime() - r2.getStartTime());
59+
}
60+
61+
if (res == 0) {
62+
res = compareDemand(r1, r2);
63+
}
64+
return res;
65+
}
66+
67+
private int compareDemand(SchedulableEntity s1, SchedulableEntity s2) {
68+
int res = 0;
69+
long demand1 = s1.getSchedulingResourceUsage()
70+
.getCachedDemand(CommonNodeLabelsManager.ANY).getMemorySize();
71+
long demand2 = s2.getSchedulingResourceUsage()
72+
.getCachedDemand(CommonNodeLabelsManager.ANY).getMemorySize();
73+
74+
if ((demand1 == 0) && (demand2 > 0)) {
75+
res = 1;
76+
} else if ((demand2 == 0) && (demand1 > 0)) {
77+
res = -1;
78+
}
79+
4980
return res;
5081
}
5182
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,10 @@ public interface SchedulableEntity {
6060
* @return partition
6161
*/
6262
String getPartition();
63+
64+
/**
65+
* Start time of the job.
66+
* @return start time
67+
*/
68+
long getStartTime();
6369
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hadoop.yarn.api.records.Priority;
2222
import org.apache.hadoop.yarn.api.records.Resource;
2323
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
24+
import org.apache.hadoop.yarn.util.SystemClock;
2425
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
2526

2627

@@ -31,6 +32,7 @@ public class MockSchedulableEntity implements SchedulableEntity {
3132
private Priority priority;
3233
private boolean isRecovering;
3334
private String partition = "";
35+
private long startTime;
3436

3537
public MockSchedulableEntity() { }
3638

@@ -39,6 +41,7 @@ public MockSchedulableEntity(long serial, int priority,
3941
this.serial = serial;
4042
this.priority = Priority.newInstance(priority);
4143
this.isRecovering = isRecovering;
44+
this.startTime = SystemClock.getInstance().getTime();
4245
}
4346

4447
public void setId(String id) {
@@ -108,4 +111,13 @@ public String getPartition() {
108111
public void setPartition(String partition) {
109112
this.partition = partition;
110113
}
114+
115+
@Override
116+
public long getStartTime() {
117+
return this.startTime;
118+
}
119+
120+
public void setStartTime(long startTime) {
121+
this.startTime = startTime;
122+
}
111123
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
2020

21+
import static org.junit.Assert.assertEquals;
22+
2123
import java.util.*;
2224

2325
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -47,7 +49,8 @@ public void testSimpleComparison() {
4749
MockSchedulableEntity r1 = new MockSchedulableEntity();
4850
MockSchedulableEntity r2 = new MockSchedulableEntity();
4951

50-
Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
52+
assertEquals("Comparator Output", 0,
53+
policy.getComparator().compare(r1, r2));
5154

5255
//consumption
5356
r1.setUsed(Resources.createResource(1, 0));
@@ -65,7 +68,8 @@ public void testSizeBasedWeight() {
6568
MockSchedulableEntity r2 = new MockSchedulableEntity();
6669

6770
//No changes, equal
68-
Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
71+
assertEquals("Comparator Output", 0,
72+
policy.getComparator().compare(r1, r2));
6973

7074
r1.setUsed(Resources.createResource(4 * GB));
7175
r2.setUsed(Resources.createResource(4 * GB));
@@ -79,7 +83,8 @@ public void testSizeBasedWeight() {
7983
r2.getSchedulingResourceUsage());
8084

8185
//Same, equal
82-
Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
86+
assertEquals("Comparator Output", 0,
87+
policy.getComparator().compare(r1, r2));
8388

8489
r2.setUsed(Resources.createResource(5 * GB));
8590
r2.setPending(Resources.createResource(5 * GB));
@@ -235,4 +240,110 @@ public void checkIds(Iterator<MockSchedulableEntity> si,
235240
}
236241
}
237242

243+
@Test
244+
public void testOrderingUsingUsedAndPendingResources() {
245+
FairOrderingPolicy<MockSchedulableEntity> policy =
246+
new FairOrderingPolicy<>();
247+
policy.setSizeBasedWeight(true);
248+
MockSchedulableEntity r1 = new MockSchedulableEntity();
249+
MockSchedulableEntity r2 = new MockSchedulableEntity();
250+
251+
r1.setUsed(Resources.createResource(4 * GB));
252+
r2.setUsed(Resources.createResource(4 * GB));
253+
254+
r1.setPending(Resources.createResource(4 * GB));
255+
r2.setPending(Resources.createResource(4 * GB));
256+
257+
AbstractComparatorOrderingPolicy
258+
.updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
259+
AbstractComparatorOrderingPolicy
260+
.updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
261+
262+
// Same, equal
263+
assertEquals("Comparator Output", 0,
264+
policy.getComparator().compare(r1, r2));
265+
266+
r1.setUsed(Resources.createResource(4 * GB));
267+
r2.setUsed(Resources.createResource(8 * GB));
268+
269+
r1.setPending(Resources.createResource(4 * GB));
270+
r2.setPending(Resources.createResource(8 * GB));
271+
272+
AbstractComparatorOrderingPolicy
273+
.updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
274+
AbstractComparatorOrderingPolicy
275+
.updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
276+
277+
Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0);
278+
}
279+
280+
@Test
281+
public void testOrderingUsingAppSubmitTime() {
282+
FairOrderingPolicy<MockSchedulableEntity> policy =
283+
new FairOrderingPolicy<>();
284+
policy.setSizeBasedWeight(true);
285+
MockSchedulableEntity r1 = new MockSchedulableEntity();
286+
MockSchedulableEntity r2 = new MockSchedulableEntity();
287+
288+
// R1, R2 has been started at same time
289+
assertEquals(r1.getStartTime(), r2.getStartTime());
290+
291+
// No changes, equal
292+
assertEquals("Comparator Output", 0,
293+
policy.getComparator().compare(r1, r2));
294+
295+
// R2 has been started after R1
296+
r1.setStartTime(5);
297+
r2.setStartTime(10);
298+
299+
Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0);
300+
301+
// R1 has been started after R2
302+
r1.setStartTime(10);
303+
r2.setStartTime(5);
304+
305+
Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
306+
}
307+
308+
@Test
309+
public void testOrderingUsingAppDemand() {
310+
FairOrderingPolicy<MockSchedulableEntity> policy =
311+
new FairOrderingPolicy<MockSchedulableEntity>();
312+
MockSchedulableEntity r1 = new MockSchedulableEntity();
313+
MockSchedulableEntity r2 = new MockSchedulableEntity();
314+
315+
r1.setUsed(Resources.createResource(0));
316+
r2.setUsed(Resources.createResource(0));
317+
318+
AbstractComparatorOrderingPolicy
319+
.updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
320+
AbstractComparatorOrderingPolicy
321+
.updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
322+
323+
// Same, equal
324+
assertEquals("Comparator Output", 0,
325+
policy.getComparator().compare(r1, r2));
326+
327+
// Compare demands ensures entity without resource demands gets lower
328+
// priority
329+
r1.setPending(Resources.createResource(0));
330+
r2.setPending(Resources.createResource(8 * GB));
331+
AbstractComparatorOrderingPolicy
332+
.updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
333+
AbstractComparatorOrderingPolicy
334+
.updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
335+
336+
Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
337+
338+
// When both entity has certain demands, then there is no actual comparison
339+
r1.setPending(Resources.createResource(4 * GB));
340+
r2.setPending(Resources.createResource(12 * GB));
341+
AbstractComparatorOrderingPolicy
342+
.updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
343+
AbstractComparatorOrderingPolicy
344+
.updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
345+
346+
assertEquals("Comparator Output", 0,
347+
policy.getComparator().compare(r1, r2));
348+
}
238349
}

0 commit comments

Comments
 (0)