Skip to content

Commit cd087bd

Browse files
hungjTamas Payer
authored andcommitted
YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R
(cherry picked from commit c30c23c) Change-Id: Id26057b8ca20b8b79393c1309a82030f408c7bc1
1 parent b2f81cb commit cd087bd

File tree

15 files changed

+1836
-305
lines changed

15 files changed

+1836
-305
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ public boolean isPlaceBlacklisted(String resourceName,
533533

534534
public ContainerRequest allocate(NodeType type,
535535
SchedulerNode node, SchedulerRequestKey schedulerKey,
536-
Container containerAllocated) {
536+
RMContainer containerAllocated) {
537537
writeLock.lock();
538538
try {
539539
if (null != containerAllocated) {
@@ -691,7 +691,7 @@ public boolean checkAllocation(NodeType type, SchedulerNode node,
691691
}
692692

693693
private void updateMetricsForAllocatedContainer(NodeType type,
694-
SchedulerNode node, Container containerAllocated) {
694+
SchedulerNode node, RMContainer containerAllocated) {
695695
QueueMetrics metrics = queue.getMetrics();
696696
if (pending) {
697697
// once an allocation is done we assume the application is
@@ -704,18 +704,19 @@ private void updateMetricsForAllocatedContainer(NodeType type,
704704
}
705705

706706
public static void updateMetrics(ApplicationId applicationId, NodeType type,
707-
SchedulerNode node, Container containerAllocated, String user,
707+
SchedulerNode node, RMContainer containerAllocated, String user,
708708
Queue queue) {
709709
if (LOG.isDebugEnabled()) {
710-
LOG.debug("allocate: applicationId=" + applicationId + " container="
711-
+ containerAllocated.getId() + " host=" + containerAllocated
712-
.getNodeId().toString() + " user=" + user + " resource="
713-
+ containerAllocated.getResource() + " type="
714-
+ type);
710+
LOG.debug("allocate: applicationId=" + applicationId + " container=" + containerAllocated
711+
.getContainer().getId() + " host=" + containerAllocated.getNodeId() + " user=" + user
712+
+ " resource=" + containerAllocated.getContainer().getResource() + " type=" + type);
715713
}
716714
if(node != null) {
717715
queue.getMetrics().allocateResources(node.getPartition(), user, 1,
718-
containerAllocated.getResource(), true);
716+
containerAllocated.getContainer().getResource(), false);
717+
queue.getMetrics().decrPendingResources(
718+
containerAllocated.getNodeLabelExpression(), user, 1,
719+
containerAllocated.getContainer().getResource());
719720
}
720721
queue.getMetrics().incrNodeTypeAggregations(user, type);
721722
}
@@ -794,4 +795,8 @@ public String getDefaultNodeLabelExpression() {
794795
public Map<String, String> getApplicationSchedulingEnvs() {
795796
return applicationSchedulingEnvs;
796797
}
798+
799+
public RMContext getRMContext() {
800+
return this.rmContext;
801+
}
797802
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,17 @@ private void cancelPreviousRequest(SchedulerNode schedulerNode,
161161
// Decrement the pending using a dummy RR with
162162
// resource = prev update req capability
163163
if (pendingAsk != null && pendingAsk.getCount() > 0) {
164+
Container container = Container.newInstance(UNDEFINED,
165+
schedulerNode.getNodeID(), "host:port",
166+
pendingAsk.getPerAllocationResource(),
167+
schedulerKey.getPriority(), null);
164168
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
165-
schedulerKey, Container.newInstance(UNDEFINED,
166-
schedulerNode.getNodeID(), "host:port",
167-
pendingAsk.getPerAllocationResource(),
168-
schedulerKey.getPriority(), null));
169+
schedulerKey,
170+
new RMContainerImpl(container, schedulerKey,
171+
appSchedulingInfo.getApplicationAttemptId(),
172+
schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
173+
appSchedulingInfo.getRMContext(),
174+
appPlacementAllocator.getPrimaryRequestedNodePartition()));
169175
}
170176
}
171177
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.metrics2.MetricsSystem;
23+
import org.apache.hadoop.metrics2.annotation.Metrics;
24+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
25+
26+
@Metrics(context = "yarn")
27+
public class PartitionQueueMetrics extends QueueMetrics {
28+
29+
private String partition;
30+
31+
protected PartitionQueueMetrics(MetricsSystem ms, String queueName,
32+
Queue parent, boolean enableUserMetrics, Configuration conf,
33+
String partition) {
34+
super(ms, queueName, parent, enableUserMetrics, conf);
35+
this.partition = partition;
36+
if (getParentQueue() != null) {
37+
String newQueueName = (getParentQueue() instanceof CSQueue)
38+
? ((CSQueue) getParentQueue()).getQueuePath()
39+
: getParentQueue().getQueueName();
40+
String parentMetricName =
41+
partition + METRIC_NAME_DELIMITER + newQueueName;
42+
setParent(getQueueMetrics().get(parentMetricName));
43+
}
44+
}
45+
46+
/**
47+
* Partition * Queue * User Metrics
48+
*
49+
* Computes Metrics at Partition (Node Label) * Queue * User Level.
50+
*
51+
* Sample JMX O/P Structure:
52+
*
53+
* PartitionQueueMetrics (labelX)
54+
* QueueMetrics (A)
55+
* usermetrics
56+
* QueueMetrics (A1)
57+
* usermetrics
58+
* QueueMetrics (A2)
59+
* usermetrics
60+
* QueueMetrics (B)
61+
* usermetrics
62+
*
63+
* @return QueueMetrics
64+
*/
65+
@Override
66+
public synchronized QueueMetrics getUserMetrics(String userName) {
67+
if (users == null) {
68+
return null;
69+
}
70+
71+
String partitionJMXStr =
72+
(partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR
73+
: partition;
74+
75+
QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName);
76+
if (metrics == null) {
77+
metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
78+
null, false, this.conf, this.partition);
79+
users.put(userName, metrics);
80+
metricsSystem.register(
81+
pSourceName(partitionJMXStr).append(qSourceName(queueName))
82+
.append(",user=").append(userName).toString(),
83+
"Metrics for user '" + userName + "' in queue '" + queueName + "'",
84+
((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)
85+
.tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName));
86+
}
87+
return metrics;
88+
}
89+
}

0 commit comments

Comments
 (0)