Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand Down Expand Up @@ -395,7 +397,23 @@ public AllocateResponse allocate(AllocateRequest request)

ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
RMAppAttemptMetrics rmMetrics = getAppAttemptMetrics(appAttemptId);
// we do this here to prevent the internal lock in allocate()
if (rmMetrics != null) {
rmMetrics.setAllocateLatenciesTimestamps(request.getAskList());
}
AllocateResponse response = allocate(request, amrmTokenIdentifier);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to put the two rmMetrics != null together.
No need to do the allocate in between, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, second time we are updating latencies based on AllocateResponse, so allocate call is required in between.

if (rmMetrics != null) {
rmMetrics.updateAllocateLatencies(response.getAllocatedContainers());
}
return response;
}

protected AllocateResponse allocate(AllocateRequest request,
AMRMTokenIdentifier amrmTokenIdentifier)
throws YarnException, IOException {
ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
this.amLivelinessMonitor.receivedPing(appAttemptId);

/* check if its in cache */
Expand Down Expand Up @@ -472,6 +490,33 @@ public AllocateResponse allocate(AllocateRequest request)
}
}

protected RMAppAttemptMetrics getAppAttemptMetrics(
ApplicationAttemptId appAttemptId) {
if (appAttemptId == null) {
return null;
}
ConcurrentMap<ApplicationId, RMApp> apps = this.rmContext.getRMApps();
ApplicationId appId = appAttemptId.getApplicationId();
if (appId == null) {
return null;
}
RMApp app = apps.get(appId);
if (app == null) {
return null;
}

Map<ApplicationAttemptId, RMAppAttempt> attempts = app.getAppAttempts();
if (attempts == null) {
return null;
} else {
RMAppAttempt attempt = attempts.get(appAttemptId);
if (attempt == null) {
return null;
}
return attempt.getRMAppAttemptMetrics();
}
}

public void registerAppAttempt(ApplicationAttemptId attemptId) {
AllocateResponse response =
recordFactory.newRecordInstance(AllocateResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsInfo;
Expand Down Expand Up @@ -76,7 +77,10 @@ public class ClusterMetrics {
MutableGaugeInt rmDispatcherEventQueueSize;
@Metric("# of scheduler dispatcher event queue size")
MutableGaugeInt schedulerDispatcherEventQueueSize;

@Metric("Allocation Latencies for Guaranteed containers")
MutableQuantiles allocateLatencyGuarQuantiles;
@Metric("Allocation Latencies for Opportunistic containers")
MutableQuantiles allocateLatencyOppQuantiles;
private boolean rmEventProcMonitorEnable = false;

private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
Expand Down Expand Up @@ -117,13 +121,25 @@ public static ClusterMetrics getMetrics() {
if(INSTANCE == null){
INSTANCE = new ClusterMetrics();
registerMetrics();
INSTANCE.initialize();
isInitialized.set(true);
}
}
}
return INSTANCE;
}

private void initialize() {
allocateLatencyGuarQuantiles = registry.newQuantiles(
"AllocateLatencyGuaranteed",
"Latency to fulfill an Allocate(Guaranteed) requests", "ops",
"latency", 5);
allocateLatencyOppQuantiles = registry.newQuantiles(
"AllocateLatencyOpportunistic",
"Latency to fulfill an Allocate(Opportunistic) requests", "ops",
"latency", 5);
}

private static void registerMetrics() {
registry = new MetricsRegistry(RECORD_INFO);
registry.tag(RECORD_INFO, "ResourceManager");
Expand Down Expand Up @@ -357,6 +373,14 @@ public void incrNumContainerAssigned() {
numContainersAssigned.incrementAndGet();
}

public void addAllocateGuarLatencyEntry(long processingTime) {
allocateLatencyGuarQuantiles.add(processingTime);
}

public void addAllocateOppLatencyEntry(long processingTime) {
allocateLatencyOppQuantiles.add(processingTime);
}

private ScheduledThreadPoolExecutor getAssignCounterExecutor(){
return assignCounterExecutor;
}
Expand All @@ -376,4 +400,22 @@ public int getSchedulerEventQueueSize() {
public void setSchedulerEventQueueSize(int schedulerEventQueueSize) {
this.schedulerDispatcherEventQueueSize.set(schedulerEventQueueSize);
}

public MutableQuantiles getAllocateLatencyGuarQuantiles() {
return allocateLatencyGuarQuantiles;
}

public void setAllocateLatencyGuarQuantiles(
MutableQuantiles allocateLatencyGuarQuantiles) {
this.allocateLatencyGuarQuantiles = allocateLatencyGuarQuantiles;
}

public MutableQuantiles getAllocateLatencyOppQuantiles() {
return allocateLatencyOppQuantiles;
}

public void setAllocateLatencyOppQuantiles(
MutableQuantiles allocateLatencyOppQuantiles) {
this.allocateLatencyOppQuantiles = allocateLatencyOppQuantiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -29,6 +30,12 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.apache.commons.lang3.time.DateUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand Down Expand Up @@ -63,6 +70,11 @@ public class RMAppAttemptMetrics {
new int[NodeType.values().length][NodeType.values().length];
private volatile int totalAllocatedContainers;

private ConcurrentHashMap<Long, Long> allocationGuaranteedLatencies =
new ConcurrentHashMap<Long, Long>();
private ConcurrentHashMap<Long, Long> allocationOpportunisticLatencies =
new ConcurrentHashMap<Long, Long>();

public RMAppAttemptMetrics(ApplicationAttemptId attemptId,
RMContext rmContext) {
this.attemptId = attemptId;
Expand Down Expand Up @@ -242,4 +254,121 @@ public Resource getApplicationAttemptHeadroom() {
public void setApplicationAttemptHeadRoom(Resource headRoom) {
this.applicationHeadroom = headRoom;
}

/**
* Add allocationID latency to the application ID with a specific timestamp
* (guaranteed).
*
* @param allocId allocationId
* @param timestamp the timestamp to associate
*/
public void addAllocationGuarLatencyIfNotExists(long allocId,
long timestamp) {
allocationGuaranteedLatencies.putIfAbsent(allocId, timestamp);
}

/**
* Add allocationID latency to the application ID with a specific timestamp
* (opportunistic).
*
* @param allocId allocationId
* @param timestamp the timestamp to associate
*/
public void addAllocationOppLatencyIfNotExists(long allocId, long timestamp) {
allocationOpportunisticLatencies.putIfAbsent(allocId, timestamp);
}

/**
* Returns the time associated when the allocation Id was added. This method
* removes the allocation Id from the class (guaranteed).
*
* @param allocId the allocation ID to get the associated time
* @return the timestamp associated with that allocation id as well as stop
* tracking it
*/
public long getAndRemoveGuaAllocationLatencies(long allocId) {
Long ret = allocationGuaranteedLatencies.remove(allocId);
return ret != null ? ret : 0L;
}

/**
* Returns the time associated when the allocation Id was added. This method
* removes the allocation Id from the class (opportunistic).
*
* @param allocId the allocation ID to get the associated time
* @return the timestamp associated with that allocation id as well as stop
* tracking it
*/
public long getAndRemoveOppAllocationLatencies(long allocId) {
Long ret = allocationOpportunisticLatencies.remove(allocId);
return ret != null ? ret : 0L;
}

/**
* Set timestamp for the provided ResourceRequest. It will correctly identify
* their ExecutionType, provided they have they have allocateId != 0 (DEFAULT)
* This is used in conjunction with updatePromoteLatencies method.
*
* @param requests the ResourceRequests to add.
*/
public void setAllocateLatenciesTimestamps(List<ResourceRequest> requests) {
long now = Time.now();
for (ResourceRequest req : requests) {
if (req.getNumContainers() > 0) {
// we dont support tracking with negative or zero allocationIds
long allocationRequestId = req.getAllocationRequestId();
if (allocationRequestId > 0) {
ExecutionTypeRequest execReq = req.getExecutionTypeRequest();
if (execReq != null) {
if (ExecutionType.GUARANTEED.equals(execReq.getExecutionType())) {
addAllocationGuarLatencyIfNotExists(allocationRequestId, now);
} else {
addAllocationOppLatencyIfNotExists(allocationRequestId, now);
}
}
} else {
LOG.warn("Can't register allocate latency for {} container with"
+ "less than or equal to 0 allocation IDs",
req.getExecutionTypeRequest().getExecutionType());
}
}
}
}

/**
* Updated the JMX metrics class (ClusterMetrics) with the delta time when
* these containers where added. It will correctly identify their
* ExecutionType, provided they have they have allocateId != 0 (DEFAULT).
*
* @param response the list of the containers to allocate.
*/
public void updateAllocateLatencies(List<Container> response) {
for (Container container : response) {
long allocationRequestId = container.getAllocationRequestId();
ExecutionType executionType = container.getExecutionType();
// we dont support tracking with negative or zero allocationIds
if (allocationRequestId > 0) {
long now = System.currentTimeMillis();
long allocIdTime = (executionType == ExecutionType.GUARANTEED) ?
getAndRemoveGuaAllocationLatencies(allocationRequestId) :
getAndRemoveOppAllocationLatencies(allocationRequestId);
if (allocIdTime != 0) {
if (executionType == ExecutionType.GUARANTEED) {
ClusterMetrics.getMetrics()
.addAllocateGuarLatencyEntry(now - allocIdTime);
} else {
ClusterMetrics.getMetrics()
.addAllocateOppLatencyEntry(now - allocIdTime);
}
} else {
LOG.error("Can't register allocate latency for {} container {}; "
+ "allotTime={}", executionType, container.getId(), allocIdTime);
}
} else {
LOG.warn("Cant register promotion latency for {} container {}. Either "
+ "allocationID is less than or equal to 0 or container is lost",
executionType, container.getId());
}
}
}
}
Loading