Skip to content

Commit 8c0759d

Browse files
committed
YARN-9664. Improve response of scheduler/app activities for better understanding. Contributed by Tao Yang.
1 parent c749f62 commit 8c0759d

26 files changed

+1320
-612
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java

Lines changed: 108 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@
3030
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
3131
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
3232

33+
import java.util.function.Supplier;
34+
3335
/**
3436
* Utility for logging scheduler activities
3537
*/
36-
// FIXME: make sure CandidateNodeSet works with this class
3738
public class ActivitiesLogger {
3839
private static final Logger LOG =
3940
LoggerFactory.getLogger(ActivitiesLogger.class);
@@ -52,9 +53,9 @@ public static void recordSkippedAppActivityWithoutAllocation(
5253
ActivitiesManager activitiesManager, SchedulerNode node,
5354
SchedulerApplicationAttempt application,
5455
SchedulerRequestKey requestKey,
55-
String diagnostic) {
56+
String diagnostic, ActivityLevel level) {
5657
recordAppActivityWithoutAllocation(activitiesManager, node, application,
57-
requestKey, diagnostic, ActivityState.SKIPPED);
58+
requestKey, diagnostic, ActivityState.SKIPPED, level);
5859
}
5960

6061
/*
@@ -72,7 +73,7 @@ public static void recordRejectedAppActivityFromLeafQueue(
7273
if (activitiesManager.shouldRecordThisNode(nodeId)) {
7374
recordActivity(activitiesManager, nodeId, application.getQueueName(),
7475
application.getApplicationId().toString(), priority,
75-
ActivityState.REJECTED, diagnostic, "app");
76+
ActivityState.REJECTED, diagnostic, ActivityLevel.APP);
7677
}
7778
finishSkippedAppAllocationRecording(activitiesManager,
7879
application.getApplicationId(), ActivityState.REJECTED, diagnostic);
@@ -87,50 +88,55 @@ public static void recordAppActivityWithoutAllocation(
8788
ActivitiesManager activitiesManager, SchedulerNode node,
8889
SchedulerApplicationAttempt application,
8990
SchedulerRequestKey schedulerKey,
90-
String diagnostic, ActivityState appState) {
91+
String diagnostic, ActivityState appState, ActivityLevel level) {
9192
if (activitiesManager == null) {
9293
return;
9394
}
9495
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
9596
if (activitiesManager.shouldRecordThisNode(nodeId)) {
96-
if (schedulerKey != null) {
97-
String allocationRequestId =
98-
String.valueOf(schedulerKey.getAllocationRequestId());
99-
String priorityStr = getPriorityStr(schedulerKey);
100-
String requestName = getRequestName(priorityStr, allocationRequestId);
101-
String type = "container";
102-
// Add application-container activity into specific node allocation.
103-
activitiesManager.addSchedulingActivityForNode(nodeId,
104-
requestName, null,
105-
priorityStr, appState, diagnostic, type,
106-
null);
107-
type = "request";
108-
// Add application-container activity into specific node allocation.
109-
activitiesManager.addSchedulingActivityForNode(nodeId,
110-
application.getApplicationId().toString(), requestName,
111-
priorityStr, appState,
112-
ActivityDiagnosticConstant.EMPTY, type, allocationRequestId);
97+
String requestName = null;
98+
Integer priority = null;
99+
Long allocationRequestId = null;
100+
if (level == ActivityLevel.NODE || level == ActivityLevel.REQUEST) {
101+
if (schedulerKey == null) {
102+
LOG.warn("Request key should not be null at " + level + " level.");
103+
return;
104+
}
105+
priority = getPriority(schedulerKey);
106+
allocationRequestId = schedulerKey.getAllocationRequestId();
107+
requestName = getRequestName(priority, allocationRequestId);
108+
}
109+
switch (level) {
110+
case NODE:
111+
recordSchedulerActivityAtNodeLevel(activitiesManager, application,
112+
requestName, priority, allocationRequestId, null, nodeId,
113+
appState, diagnostic);
114+
break;
115+
case REQUEST:
116+
recordSchedulerActivityAtRequestLevel(activitiesManager, application,
117+
requestName, priority, allocationRequestId, nodeId, appState,
118+
diagnostic);
119+
break;
120+
case APP:
121+
recordSchedulerActivityAtAppLevel(activitiesManager, application,
122+
nodeId, appState, diagnostic);
123+
break;
124+
default:
125+
LOG.warn("Doesn't handle app activities at " + level + " level.");
126+
break;
113127
}
114-
// Add queue-application activity into specific node allocation.
115-
activitiesManager.addSchedulingActivityForNode(nodeId,
116-
application.getQueueName(),
117-
application.getApplicationId().toString(),
118-
application.getPriority().toString(), appState,
119-
schedulerKey != null ? ActivityDiagnosticConstant.EMPTY :
120-
diagnostic, "app", null);
121128
}
122129
// Add application-container activity into specific application allocation
123130
// Under this condition, it fails to allocate a container to this
124131
// application, so containerId is null.
125132
if (activitiesManager.shouldRecordThisApp(
126133
application.getApplicationId())) {
127-
String type = "container";
128134
activitiesManager.addSchedulingActivityForApp(
129135
application.getApplicationId(), null,
130-
getPriorityStr(schedulerKey), appState,
131-
diagnostic, type, nodeId,
136+
getPriority(schedulerKey), appState,
137+
diagnostic, level, nodeId,
132138
schedulerKey == null ?
133-
null : String.valueOf(schedulerKey.getAllocationRequestId()));
139+
null : schedulerKey.getAllocationRequestId());
134140
}
135141
}
136142

@@ -150,49 +156,68 @@ public static void recordAppActivityWithAllocation(
150156
nodeId = updatedContainer.getNodeId();
151157
}
152158
if (activitiesManager.shouldRecordThisNode(nodeId)) {
153-
String containerPriorityStr =
154-
updatedContainer.getContainer().getPriority().toString();
155-
String allocationRequestId = String
156-
.valueOf(updatedContainer.getContainer().getAllocationRequestId());
159+
Integer containerPriority =
160+
updatedContainer.getContainer().getPriority().getPriority();
161+
Long allocationRequestId =
162+
updatedContainer.getContainer().getAllocationRequestId();
157163
String requestName =
158-
getRequestName(containerPriorityStr, allocationRequestId);
159-
String type = "container";
160-
161-
// Add application-container activity into specific node allocation.
162-
activitiesManager.addSchedulingActivityForNode(nodeId,
163-
requestName,
164-
updatedContainer.getContainer().toString(),
165-
containerPriorityStr,
166-
activityState, ActivityDiagnosticConstant.EMPTY, type, null);
167-
type = "request";
168-
// Add application-container activity into specific node allocation.
169-
activitiesManager.addSchedulingActivityForNode(nodeId,
170-
application.getApplicationId().toString(),
171-
requestName, containerPriorityStr,
172-
activityState, ActivityDiagnosticConstant.EMPTY, type,
173-
allocationRequestId);
174-
type = "app";
175-
// Add queue-application activity into specific node allocation.
176-
activitiesManager.addSchedulingActivityForNode(nodeId,
177-
application.getQueueName(),
178-
application.getApplicationId().toString(),
179-
application.getPriority().toString(), ActivityState.ACCEPTED,
180-
ActivityDiagnosticConstant.EMPTY, type, null);
164+
getRequestName(containerPriority, allocationRequestId);
165+
// Add node,request,app level activities into scheduler activities.
166+
recordSchedulerActivityAtNodeLevel(activitiesManager, application,
167+
requestName, containerPriority, allocationRequestId,
168+
updatedContainer.getContainer().toString(), nodeId, activityState,
169+
ActivityDiagnosticConstant.EMPTY);
181170
}
182171
// Add application-container activity into specific application allocation
183172
if (activitiesManager.shouldRecordThisApp(
184173
application.getApplicationId())) {
185-
String type = "container";
186174
activitiesManager.addSchedulingActivityForApp(
187175
application.getApplicationId(),
188176
updatedContainer.getContainerId(),
189-
updatedContainer.getContainer().getPriority().toString(),
190-
activityState, ActivityDiagnosticConstant.EMPTY, type, nodeId,
191-
String.valueOf(
192-
updatedContainer.getContainer().getAllocationRequestId()));
177+
updatedContainer.getContainer().getPriority().getPriority(),
178+
activityState, ActivityDiagnosticConstant.EMPTY,
179+
ActivityLevel.NODE, nodeId,
180+
updatedContainer.getContainer().getAllocationRequestId());
193181
}
194182
}
195183

184+
@SuppressWarnings("parameternumber")
185+
private static void recordSchedulerActivityAtNodeLevel(
186+
ActivitiesManager activitiesManager, SchedulerApplicationAttempt app,
187+
String requestName, Integer priority, Long allocationRequestId,
188+
String containerId, NodeId nodeId, ActivityState state,
189+
String diagnostic) {
190+
activitiesManager
191+
.addSchedulingActivityForNode(nodeId, requestName, containerId, null,
192+
state, diagnostic, ActivityLevel.NODE, null);
193+
// Record request level activity additionally.
194+
recordSchedulerActivityAtRequestLevel(activitiesManager, app, requestName,
195+
priority, allocationRequestId, nodeId, state,
196+
ActivityDiagnosticConstant.EMPTY);
197+
}
198+
199+
@SuppressWarnings("parameternumber")
200+
private static void recordSchedulerActivityAtRequestLevel(
201+
ActivitiesManager activitiesManager, SchedulerApplicationAttempt app,
202+
String requestName, Integer priority, Long allocationRequestId,
203+
NodeId nodeId, ActivityState state, String diagnostic) {
204+
activitiesManager.addSchedulingActivityForNode(nodeId,
205+
app.getApplicationId().toString(), requestName, priority,
206+
state, diagnostic, ActivityLevel.REQUEST,
207+
allocationRequestId);
208+
// Record app level activity additionally.
209+
recordSchedulerActivityAtAppLevel(activitiesManager, app, nodeId, state,
210+
ActivityDiagnosticConstant.EMPTY);
211+
}
212+
213+
private static void recordSchedulerActivityAtAppLevel(
214+
ActivitiesManager activitiesManager, SchedulerApplicationAttempt app,
215+
NodeId nodeId, ActivityState state, String diagnostic) {
216+
activitiesManager.addSchedulingActivityForNode(nodeId, app.getQueueName(),
217+
app.getApplicationId().toString(), app.getPriority().getPriority(),
218+
state, diagnostic, ActivityLevel.APP, null);
219+
}
220+
196221
/*
197222
* Invoked when scheduler starts to look at this application within one node
198223
* update.
@@ -252,13 +277,20 @@ public static class QUEUE {
252277
public static void recordQueueActivity(ActivitiesManager activitiesManager,
253278
SchedulerNode node, String parentQueueName, String queueName,
254279
ActivityState state, String diagnostic) {
280+
recordQueueActivity(activitiesManager, node, parentQueueName, queueName,
281+
state, () -> diagnostic);
282+
}
283+
284+
public static void recordQueueActivity(ActivitiesManager activitiesManager,
285+
SchedulerNode node, String parentQueueName, String queueName,
286+
ActivityState state, Supplier<String> diagnosticSupplier) {
255287
if (activitiesManager == null) {
256288
return;
257289
}
258290
NodeId nodeId = getRecordingNodeId(activitiesManager, node);
259291
if (activitiesManager.shouldRecordThisNode(nodeId)) {
260292
recordActivity(activitiesManager, nodeId, parentQueueName, queueName,
261-
null, state, diagnostic, null);
293+
null, state, diagnosticSupplier.get(), ActivityLevel.QUEUE);
262294
}
263295
}
264296
}
@@ -299,11 +331,11 @@ public static void finishAllocatedNodeAllocation(
299331
* Invoked when node heartbeat finishes
300332
*/
301333
public static void finishNodeUpdateRecording(
302-
ActivitiesManager activitiesManager, NodeId nodeID) {
334+
ActivitiesManager activitiesManager, NodeId nodeID, String partition) {
303335
if (activitiesManager == null) {
304336
return;
305337
}
306-
activitiesManager.finishNodeUpdateRecording(nodeID);
338+
activitiesManager.finishNodeUpdateRecording(nodeID, partition);
307339
}
308340

309341
/*
@@ -320,11 +352,11 @@ public static void startNodeUpdateRecording(
320352

321353
// Add queue, application or container activity into specific node allocation.
322354
private static void recordActivity(ActivitiesManager activitiesManager,
323-
NodeId nodeId, String parentName, String childName,
324-
Priority priority, ActivityState state, String diagnostic, String type) {
355+
NodeId nodeId, String parentName, String childName, Priority priority,
356+
ActivityState state, String diagnostic, ActivityLevel level) {
325357
activitiesManager.addSchedulingActivityForNode(nodeId, parentName,
326-
childName, priority != null ? priority.toString() : null, state,
327-
diagnostic, type, null);
358+
childName, priority != null ? priority.getPriority() : null, state,
359+
diagnostic, level, null);
328360
}
329361

330362
private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager,
@@ -333,16 +365,16 @@ private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager,
333365
activitiesManager.getRecordingNodeId(node);
334366
}
335367

336-
private static String getRequestName(String priority,
337-
String allocationRequestId) {
368+
private static String getRequestName(Integer priority,
369+
Long allocationRequestId) {
338370
return "request_"
339371
+ (priority == null ? "" : priority)
340372
+ "_" + (allocationRequestId == null ? "" : allocationRequestId);
341373
}
342374

343-
private static String getPriorityStr(SchedulerRequestKey schedulerKey) {
375+
private static Integer getPriority(SchedulerRequestKey schedulerKey) {
344376
Priority priority = schedulerKey == null ?
345377
null : schedulerKey.getPriority();
346-
return priority == null ? null : priority.toString();
378+
return priority == null ? null : priority.getPriority();
347379
}
348380
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private void setupConfForCleanup(Configuration conf) {
124124
}
125125

126126
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
127-
Set<String> requestPriorities, Set<String> allocationRequestIds,
127+
Set<Integer> requestPriorities, Set<Long> allocationRequestIds,
128128
RMWSConsts.ActivitiesGroupBy groupBy, int limit, boolean summarize,
129129
double maxTimeInSeconds) {
130130
RMApp app = rmContext.getRMApps().get(applicationId);
@@ -186,20 +186,18 @@ private AppAllocation getSummarizedAppAllocation(
186186
}
187187
List<ActivityNode> activityNodes = appAllocation.getAllocationAttempts();
188188
for (ActivityNode an : activityNodes) {
189-
if (an.getNodeId() != null) {
190-
nodeActivities.putIfAbsent(
191-
an.getRequestPriority() + "_" + an.getAllocationRequestId() + "_"
192-
+ an.getNodeId(), an);
193-
}
189+
nodeActivities.putIfAbsent(
190+
an.getRequestPriority() + "_" + an.getAllocationRequestId() + "_"
191+
+ an.getNodeId(), an);
194192
}
195193
}
196194
AppAllocation lastAppAllocation = allocations.get(allocations.size() - 1);
197195
AppAllocation summarizedAppAllocation =
198196
new AppAllocation(lastAppAllocation.getPriority(), null,
199197
lastAppAllocation.getQueueName());
200-
summarizedAppAllocation
201-
.updateAppContainerStateAndTime(null, lastAppAllocation.getAppState(),
202-
lastAppAllocation.getTime(), lastAppAllocation.getDiagnostic());
198+
summarizedAppAllocation.updateAppContainerStateAndTime(null,
199+
lastAppAllocation.getActivityState(), lastAppAllocation.getTime(),
200+
lastAppAllocation.getDiagnostic());
203201
summarizedAppAllocation
204202
.setAllocationAttempts(new ArrayList<>(nodeActivities.values()));
205203
return summarizedAppAllocation;
@@ -282,7 +280,7 @@ public void run() {
282280
Map.Entry<NodeId, List<NodeAllocation>> nodeAllocation = ite.next();
283281
List<NodeAllocation> allocations = nodeAllocation.getValue();
284282
if (allocations.size() > 0
285-
&& curTS - allocations.get(0).getTimeStamp()
283+
&& curTS - allocations.get(0).getTimestamp()
286284
> schedulerActivitiesTTL) {
287285
ite.remove();
288286
}
@@ -383,26 +381,26 @@ void startAppAllocationRecording(NodeId nodeID, long currTS,
383381

384382
// Add queue, application or container activity into specific node allocation.
385383
void addSchedulingActivityForNode(NodeId nodeId, String parentName,
386-
String childName, String priority, ActivityState state, String diagnostic,
387-
String type, String allocationRequestId) {
384+
String childName, Integer priority, ActivityState state,
385+
String diagnostic, ActivityLevel level, Long allocationRequestId) {
388386
if (shouldRecordThisNode(nodeId)) {
389387
NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeId);
390388
nodeAllocation.addAllocationActivity(parentName, childName, priority,
391-
state, diagnostic, type, nodeId, allocationRequestId);
389+
state, diagnostic, level, nodeId, allocationRequestId);
392390
}
393391
}
394392

395393
// Add queue, application or container activity into specific application
396394
// allocation.
397395
void addSchedulingActivityForApp(ApplicationId applicationId,
398-
ContainerId containerId, String priority, ActivityState state,
399-
String diagnostic, String type, NodeId nodeId,
400-
String allocationRequestId) {
396+
ContainerId containerId, Integer priority, ActivityState state,
397+
String diagnostic, ActivityLevel level, NodeId nodeId,
398+
Long allocationRequestId) {
401399
if (shouldRecordThisApp(applicationId)) {
402400
AppAllocation appAllocation = appsAllocation.get().get(applicationId);
403401
appAllocation.addAppAllocationActivity(containerId == null ?
404402
"Container-Id-Not-Assigned" :
405-
containerId.toString(), priority, state, diagnostic, type, nodeId,
403+
containerId.toString(), priority, state, diagnostic, level, nodeId,
406404
allocationRequestId);
407405
}
408406
}
@@ -450,16 +448,17 @@ void finishAppAllocationRecording(ApplicationId applicationId,
450448
}
451449
}
452450

453-
void finishNodeUpdateRecording(NodeId nodeID) {
451+
void finishNodeUpdateRecording(NodeId nodeID, String partition) {
454452
List<NodeAllocation> value = recordingNodesAllocation.get().get(nodeID);
455-
long timeStamp = SystemClock.getInstance().getTime();
453+
long timestamp = SystemClock.getInstance().getTime();
456454

457455
if (value != null) {
458456
if (value.size() > 0) {
459457
lastAvailableNodeActivities = value;
460458
for (NodeAllocation allocation : lastAvailableNodeActivities) {
461459
allocation.transformToTree();
462-
allocation.setTimeStamp(timeStamp);
460+
allocation.setTimestamp(timestamp);
461+
allocation.setPartition(partition);
463462
}
464463
if (recordNextAvailableNode) {
465464
recordNextAvailableNode = false;

0 commit comments

Comments
 (0)