From 138b7fca652bdf05f5ebd05f12391f20be22ddac Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Tue, 7 Dec 2021 00:19:34 +0530 Subject: [PATCH 1/7] YARN-11028. Add metrics for container allocation latency --- .../ApplicationMasterService.java | 30 ++ .../resourcemanager/ClusterMetrics.java | 26 +- .../rmapp/attempt/RMAppAttemptMetrics.java | 156 ++++++++++ .../TestAllocateLatenciesMetrics.java | 267 ++++++++++++++++++ 4 files changed, 478 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 28e889b2deb3e..6857b054f3246 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -395,7 +396,19 @@ public AllocateResponse allocate(AllocateRequest request) ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); + RMAppAttemptMetrics rmMetrics = getAppAttemptMetrics(appAttemptId); + // we do this here to prevent the internal lock in allocate() + rmMetrics.setAllocateLatenciesTimestamps(request.getAskList()); + AllocateResponse response = allocate(request, amrmTokenIdentifier); + rmMetrics.updateAllocateLatencies(response.getAllocatedContainers()); + return response; + } + protected AllocateResponse allocate(AllocateRequest request, + AMRMTokenIdentifier amrmTokenIdentifier) + throws YarnException, IOException { + ApplicationAttemptId appAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); this.amLivelinessMonitor.receivedPing(appAttemptId); /* check if its in cache */ @@ -472,6 +485,23 @@ public AllocateResponse allocate(AllocateRequest request) } } + protected RMAppAttemptMetrics getAppAttemptMetrics( + ApplicationAttemptId appAttemptId) { + if (appAttemptId == null) { + return null; + } + RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (app == null) { + return null; + } + RMAppAttempt attempt = app.getAppAttempts().get(appAttemptId); + if (attempt == null) { + return null; + } + + return attempt.getRMAppAttemptMetrics(); + } + public void registerAppAttempt(ApplicationAttemptId attemptId) { AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java index c56a1e7b3d92d..afc28c9aa1ab6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.metrics2.MetricsInfo; @@ -76,7 +77,10 @@ public class ClusterMetrics { MutableGaugeInt rmDispatcherEventQueueSize; @Metric("# of scheduler dispatcher event queue size") MutableGaugeInt schedulerDispatcherEventQueueSize; - + @Metric("Allocation Latencies for Guarantee containers") + MutableQuantiles allocateLatencyGuarQuantiles; + @Metric("Allocation Latencies for Opportunistic containers") + MutableQuantiles allocateLatencyOppQuantiles; private boolean rmEventProcMonitorEnable = false; private static final MetricsInfo RECORD_INFO = info("ClusterMetrics", @@ -117,6 +121,7 @@ public static ClusterMetrics getMetrics() { if(INSTANCE == null){ INSTANCE = new ClusterMetrics(); registerMetrics(); + INSTANCE.initialize(); isInitialized.set(true); } } @@ -124,6 +129,17 @@ public static ClusterMetrics getMetrics() { return INSTANCE; } + private void initialize() { + allocateLatencyGuarQuantiles = registry + .newQuantiles("AllocateLatencyGuaranteed", + "Latency to fulfill an Allocate(Guaranteed) requests", "ops", + "latency", 5); + allocateLatencyOppQuantiles = registry + .newQuantiles("AllocateLatencyOpportunistic", + "Latency to fulfill an Allocate(Opportunistic) requests", "ops", + "latency", 5); + } + private static void registerMetrics() { registry = new MetricsRegistry(RECORD_INFO); registry.tag(RECORD_INFO, "ResourceManager"); @@ -357,6 +373,14 @@ public void incrNumContainerAssigned() { numContainersAssigned.incrementAndGet(); } + public void addAllocateGuarLatencyEntry(long processingTime) { + allocateLatencyGuarQuantiles.add(processingTime); + } + + public void addAllocateOppLatencyEntry(long processingTime) { + allocateLatencyOppQuantiles.add(processingTime); + } + private ScheduledThreadPoolExecutor getAssignCounterExecutor(){ return assignCounterExecutor; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java index 3f3d875c4b493..3a9dbb428ce4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,6 +30,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.lang3.time.DateUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -63,6 +69,9 @@ public class RMAppAttemptMetrics { new int[NodeType.values().length][NodeType.values().length]; private volatile int totalAllocatedContainers; + private ConcurrentHashMap allocationGuaranteedLatencies = null; + private ConcurrentHashMap allocationOpportunisticLatencies = null; + public RMAppAttemptMetrics(ApplicationAttemptId attemptId, RMContext rmContext) { this.attemptId = attemptId; @@ -70,6 +79,8 @@ public RMAppAttemptMetrics(ApplicationAttemptId attemptId, this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); this.rmContext = rmContext; + this.allocationGuaranteedLatencies = new ConcurrentHashMap(); + this.allocationOpportunisticLatencies = new ConcurrentHashMap(); } public void updatePreemptionInfo(Resource resource, RMContainer container) { @@ -242,4 +253,149 @@ public Resource getApplicationAttemptHeadroom() { public void setApplicationAttemptHeadRoom(Resource headRoom) { this.applicationHeadroom = headRoom; } + + /** + * Add allocationID latency to the application ID with a timestap = + * CurrentTime (guaranteed) + * + * @param allocId the allocation Id to add If the allocation ID is already + * present (which shouldn't happen) it ignores the entry + */ + public void addAllocationGuarLatencyIfNotExists(long allocId) { + addAllocationGuarLatencyIfNotExists(allocId, System.currentTimeMillis()); + } + + /** + * Add allocationID latency to the application ID with a specific timestamp + * (guaranteed) + * + * @param allocId allocationId + * @param timestamp the timestamp to associate + */ + public void addAllocationGuarLatencyIfNotExists(long allocId, + long timestamp) { + allocationGuaranteedLatencies.putIfAbsent(allocId, timestamp); + } + + /** + * Add allocationID latency to the application ID with a timestap = + * CurrentTime (opportunistic) + * + * @param allocId the allocation Id to add If the allocation ID is already + * present (which shouldn't happen) it ignores the entry + */ + public void addAllocationOppLatencyIfNotExists(long allocId) { + this.addAllocationOppLatencyIfNotExists(allocId, + System.currentTimeMillis()); + } + + /** + * Add allocationID latency to the application ID with a specific timestamp + * (opportunistic) + * + * @param allocId allocationId + * @param timestamp the timestamp to associate + */ + public void addAllocationOppLatencyIfNotExists(long allocId, long timestamp) { + allocationOpportunisticLatencies.putIfAbsent(allocId, timestamp); + } + + /** + * Returns the time associated when the allocation Id was added This method + * removes the allocation Id from the class (guaranteed) + * + * @param allocId the allocation ID to get the associated time + * @return the timestamp associated with that allocation id as well as stop + * tracking it + */ + public long getAndRemoveGuaAllocationLatencies(long allocId) { + Long ret = allocationGuaranteedLatencies.remove(new Long(allocId)); + return ret != null ? ret : 0l; + } + + /** + * Returns the time associated when the allocation Id was added This method + * removes the allocation Id from the class (opportunistic) + * + * @param allocId the allocation ID to get the associated time + * @return the timestamp associated with that allocation id as well as stop + * tracking it + */ + public long getAndRemoveOppAllocationLatencies(long allocId) { + Long ret = allocationOpportunisticLatencies.remove(new Long(allocId)); + return ret != null ? ret : 0l; + } + + /** + * Set timestamp for the provided ResourceRequest. It will correctly identify + * their ExecutionType, provided they have they have allocateId != 0 (DEFAULT) + * This is used in conjunction with This is used in conjunction with + * updatePromoteLatencies method method + * + * @param requests the ResourceRequests to add. + */ + public void setAllocateLatenciesTimestamps(List requests) { + long now = Time.now(); + for (ResourceRequest req : requests) { + if (req.getNumContainers() > 0) { + // we dont support tracking with negative or zero allocationIds + long allocationRequestId = req.getAllocationRequestId(); + if (allocationRequestId > 0) { + if (req.getExecutionTypeRequest() != null) { + if (ExecutionType.GUARANTEED + .equals(req.getExecutionTypeRequest().getExecutionType())) { + addAllocationGuarLatencyIfNotExists(allocationRequestId, now); + } else { + addAllocationOppLatencyIfNotExists(allocationRequestId, now); + } + } + } else { + LOG.warn(String.format( + "Can't register allocate latency for %s container " + + "with negative or zero allocation IDs", + req.getExecutionTypeRequest().getExecutionType())); + } + } + } + } + + /** + * Updated the JMX metrics class (ClusterMetrics) with the delta time when + * these containers where added. It will correctly identify their + * ExecutionType, provided they have they have allocateId != 0 (DEFAULT) + * + * @param response the list of the containers to allocate. + */ + public void updateAllocateLatencies(List response) { + + for (Container container : response) { + long allocationRequestId = container.getAllocationRequestId(); + // we dont support tracking with negative or zero allocationIds + if (allocationRequestId > 0) { + long now = System.currentTimeMillis(); + long allocIdTime = + (container.getExecutionType() == ExecutionType.GUARANTEED) ? + getAndRemoveGuaAllocationLatencies(allocationRequestId) : + getAndRemoveOppAllocationLatencies(allocationRequestId); + if (allocIdTime != 0) { + if (container.getExecutionType() == ExecutionType.GUARANTEED) { + ClusterMetrics.getMetrics() + .addAllocateGuarLatencyEntry(now - allocIdTime); + } else { + ClusterMetrics.getMetrics() + .addAllocateOppLatencyEntry(now - allocIdTime); + } + } else { + LOG.error(String.format( + "Can't register allocate latency for %s container %s; allotTime=%d ", + container.getExecutionType(), container.getId(), allocIdTime)); + } + } else { + LOG.warn(String.format("Cant register promotion latency " + + "for container %s. Either allocationID is less than equal to 0 or " + + "lost the container ID", container.getExecutionType().name(), + container.getId())); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java new file mode 100644 index 0000000000000..5deec56a2b795 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +public class TestAllocateLatenciesMetrics { + + private static final int GB = 1024; + + private MockRM rm; + private DrainDispatcher dispatcher; + + private OpportunisticContainersStatus oppContainersStatus = + getOpportunisticStatus(); + + @Before + public void createAndStartRM() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + startRM(conf); + } + + private void startRM(final YarnConfiguration conf) { + dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + } + + @After + public void stopRM() { + if (rm != null) { + rm.stop(); + } + } + + @Test + public void testGuaranteedLatenciesMetrics() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) + .withAppName("app").withUser("user").withAcls(null) + .withQueue("default").build(); + + RMApp app1 = MockRMAppSubmitter.submit(rm, data); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + ResourceRequest rr1 = ResourceRequest + .newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true)); + ResourceRequest rr2 = ResourceRequest + .newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true)); + rr1.setAllocationRequestId(1); + rr2.setAllocationRequestId(2); + AllocateResponse allocateResponse = + am1.allocate(Arrays.asList(rr1, rr2), null); + + // make sure the containers get allocated + nm1.nodeHeartbeat(true); + Thread.sleep(500); + + for (int i = 0; i < 2; i++) { + allocateResponse.getAllocatedContainers().addAll( + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + nm1.nodeHeartbeat(true); + Thread.sleep(500); + } + + List allocatedContainers = + allocateResponse.getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.GUARANTEED, + ContainerState.RUNNING, "", 0)), true); + rm.drainEvents(); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Container Completed in the NM + allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.GUARANTEED, + ContainerState.COMPLETE, "", 0)), true); + Assert.assertTrue( + ClusterMetrics.getMetrics().allocateLatencyGuarQuantiles.changed()); + rm.drainEvents(); + + // Verify that container has been removed.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertNull(rmContainer); + } + + @Test + public void testOpportunisticLatenciesMetrics() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) + .withAppName("app").withUser("user").withAcls(null) + .withQueue("default").build(); + + RMApp app1 = MockRMAppSubmitter.submit(rm, data); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + GenericTestUtils + .waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, 10, + 10 * 100); + + AllocateResponse allocateResponse = am1.allocate(Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); + + List allocatedContainers = + allocateResponse.getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.RUNNING, "", 0)), true); + rm.drainEvents(); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Container Completed in the NM + allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.COMPLETE, "", 0)), true); + Assert.assertTrue( + ClusterMetrics.getMetrics().allocateLatencyOppQuantiles.changed()); + rm.drainEvents(); + + // Verify that container has been removed.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertNull(rmContainer); + } + + private OpportunisticContainersStatus getOpportunisticStatus() { + return getOpportunisticStatus(-1, 100, 1000); + } + + private OpportunisticContainersStatus getOpportunisticStatus(int waitTime, + int queueLength, int queueCapacity) { + OpportunisticContainersStatus status = + OpportunisticContainersStatus.newInstance(); + status.setEstimatedQueueWaitTime(waitTime); + status.setOpportQueueCapacity(queueCapacity); + status.setWaitQueueLength(queueLength); + return status; + } +} \ No newline at end of file From 6421ca49fe7e5ac962bd22010318c06c18b0d97d Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Fri, 10 Dec 2021 12:04:41 +0530 Subject: [PATCH 2/7] Addressed comments --- .../ApplicationMasterService.java | 8 +- .../resourcemanager/ClusterMetrics.java | 40 +++++--- .../rmapp/attempt/RMAppAttemptMetrics.java | 93 +++++++------------ 3 files changed, 68 insertions(+), 73 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 6857b054f3246..3ce96218b5b64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -398,9 +398,13 @@ public AllocateResponse allocate(AllocateRequest request) amrmTokenIdentifier.getApplicationAttemptId(); RMAppAttemptMetrics rmMetrics = getAppAttemptMetrics(appAttemptId); // we do this here to prevent the internal lock in allocate() - rmMetrics.setAllocateLatenciesTimestamps(request.getAskList()); + if (rmMetrics != null) { + rmMetrics.setAllocateLatenciesTimestamps(request.getAskList()); + } AllocateResponse response = allocate(request, amrmTokenIdentifier); - rmMetrics.updateAllocateLatencies(response.getAllocatedContainers()); + if (rmMetrics != null) { + rmMetrics.updateAllocateLatencies(response.getAllocatedContainers()); + } return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java index afc28c9aa1ab6..15de7661ec27b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -77,10 +77,10 @@ public class ClusterMetrics { MutableGaugeInt rmDispatcherEventQueueSize; @Metric("# of scheduler dispatcher event queue size") MutableGaugeInt schedulerDispatcherEventQueueSize; - @Metric("Allocation Latencies for Guarantee containers") - MutableQuantiles allocateLatencyGuarQuantiles; + @Metric("Allocation Latencies for Guaranteed containers") + MutableQuantiles allocateLatencyGuarQuantiles; @Metric("Allocation Latencies for Opportunistic containers") - MutableQuantiles allocateLatencyOppQuantiles; + MutableQuantiles allocateLatencyOppQuantiles; private boolean rmEventProcMonitorEnable = false; private static final MetricsInfo RECORD_INFO = info("ClusterMetrics", @@ -130,14 +130,14 @@ public static ClusterMetrics getMetrics() { } private void initialize() { - allocateLatencyGuarQuantiles = registry - .newQuantiles("AllocateLatencyGuaranteed", - "Latency to fulfill an Allocate(Guaranteed) requests", "ops", - "latency", 5); - allocateLatencyOppQuantiles = registry - .newQuantiles("AllocateLatencyOpportunistic", - "Latency to fulfill an Allocate(Opportunistic) requests", "ops", - "latency", 5); + allocateLatencyGuarQuantiles = registry.newQuantiles( + "AllocateLatencyGuaranteed", + "Latency to fulfill an Allocate(Guaranteed) requests", "ops", + "latency", 5); + allocateLatencyOppQuantiles = registry.newQuantiles( + "AllocateLatencyOpportunistic", + "Latency to fulfill an Allocate(Opportunistic) requests", "ops", + "latency", 5); } private static void registerMetrics() { @@ -400,4 +400,22 @@ public int getSchedulerEventQueueSize() { public void setSchedulerEventQueueSize(int schedulerEventQueueSize) { this.schedulerDispatcherEventQueueSize.set(schedulerEventQueueSize); } + + public MutableQuantiles getAllocateLatencyGuarQuantiles() { + return allocateLatencyGuarQuantiles; + } + + public void setAllocateLatencyGuarQuantiles( + MutableQuantiles allocateLatencyGuarQuantiles) { + this.allocateLatencyGuarQuantiles = allocateLatencyGuarQuantiles; + } + + public MutableQuantiles getAllocateLatencyOppQuantiles() { + return allocateLatencyOppQuantiles; + } + + public void setAllocateLatencyOppQuantiles( + MutableQuantiles allocateLatencyOppQuantiles) { + this.allocateLatencyOppQuantiles = allocateLatencyOppQuantiles; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java index 3a9dbb428ce4d..667a55a5a75c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java @@ -33,6 +33,7 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.slf4j.Logger; @@ -69,8 +70,10 @@ public class RMAppAttemptMetrics { new int[NodeType.values().length][NodeType.values().length]; private volatile int totalAllocatedContainers; - private ConcurrentHashMap allocationGuaranteedLatencies = null; - private ConcurrentHashMap allocationOpportunisticLatencies = null; + private ConcurrentHashMap allocationGuaranteedLatencies = + new ConcurrentHashMap(); + private ConcurrentHashMap allocationOpportunisticLatencies = + new ConcurrentHashMap(); public RMAppAttemptMetrics(ApplicationAttemptId attemptId, RMContext rmContext) { @@ -79,8 +82,6 @@ public RMAppAttemptMetrics(ApplicationAttemptId attemptId, this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); this.rmContext = rmContext; - this.allocationGuaranteedLatencies = new ConcurrentHashMap(); - this.allocationOpportunisticLatencies = new ConcurrentHashMap(); } public void updatePreemptionInfo(Resource resource, RMContainer container) { @@ -254,20 +255,9 @@ public void setApplicationAttemptHeadRoom(Resource headRoom) { this.applicationHeadroom = headRoom; } - /** - * Add allocationID latency to the application ID with a timestap = - * CurrentTime (guaranteed) - * - * @param allocId the allocation Id to add If the allocation ID is already - * present (which shouldn't happen) it ignores the entry - */ - public void addAllocationGuarLatencyIfNotExists(long allocId) { - addAllocationGuarLatencyIfNotExists(allocId, System.currentTimeMillis()); - } - /** * Add allocationID latency to the application ID with a specific timestamp - * (guaranteed) + * (guaranteed). * * @param allocId allocationId * @param timestamp the timestamp to associate @@ -277,21 +267,9 @@ public void addAllocationGuarLatencyIfNotExists(long allocId, allocationGuaranteedLatencies.putIfAbsent(allocId, timestamp); } - /** - * Add allocationID latency to the application ID with a timestap = - * CurrentTime (opportunistic) - * - * @param allocId the allocation Id to add If the allocation ID is already - * present (which shouldn't happen) it ignores the entry - */ - public void addAllocationOppLatencyIfNotExists(long allocId) { - this.addAllocationOppLatencyIfNotExists(allocId, - System.currentTimeMillis()); - } - /** * Add allocationID latency to the application ID with a specific timestamp - * (opportunistic) + * (opportunistic). * * @param allocId allocationId * @param timestamp the timestamp to associate @@ -301,36 +279,35 @@ public void addAllocationOppLatencyIfNotExists(long allocId, long timestamp) { } /** - * Returns the time associated when the allocation Id was added This method - * removes the allocation Id from the class (guaranteed) + * Returns the time associated when the allocation Id was added. This method + * removes the allocation Id from the class (guaranteed). * * @param allocId the allocation ID to get the associated time * @return the timestamp associated with that allocation id as well as stop * tracking it */ public long getAndRemoveGuaAllocationLatencies(long allocId) { - Long ret = allocationGuaranteedLatencies.remove(new Long(allocId)); - return ret != null ? ret : 0l; + Long ret = allocationGuaranteedLatencies.remove(allocId); + return ret != null ? ret : 0L; } /** - * Returns the time associated when the allocation Id was added This method - * removes the allocation Id from the class (opportunistic) + * Returns the time associated when the allocation Id was added. This method + * removes the allocation Id from the class (opportunistic). * * @param allocId the allocation ID to get the associated time * @return the timestamp associated with that allocation id as well as stop * tracking it */ public long getAndRemoveOppAllocationLatencies(long allocId) { - Long ret = allocationOpportunisticLatencies.remove(new Long(allocId)); - return ret != null ? ret : 0l; + Long ret = allocationOpportunisticLatencies.remove(allocId); + return ret != null ? ret : 0L; } /** * Set timestamp for the provided ResourceRequest. It will correctly identify * their ExecutionType, provided they have they have allocateId != 0 (DEFAULT) - * This is used in conjunction with This is used in conjunction with - * updatePromoteLatencies method method + * This is used in conjunction with updatePromoteLatencies method. * * @param requests the ResourceRequests to add. */ @@ -341,19 +318,18 @@ public void setAllocateLatenciesTimestamps(List requests) { // we dont support tracking with negative or zero allocationIds long allocationRequestId = req.getAllocationRequestId(); if (allocationRequestId > 0) { - if (req.getExecutionTypeRequest() != null) { - if (ExecutionType.GUARANTEED - .equals(req.getExecutionTypeRequest().getExecutionType())) { + ExecutionTypeRequest execReq = req.getExecutionTypeRequest(); + if (execReq != null) { + if (ExecutionType.GUARANTEED.equals(execReq.getExecutionType())) { addAllocationGuarLatencyIfNotExists(allocationRequestId, now); } else { addAllocationOppLatencyIfNotExists(allocationRequestId, now); } } } else { - LOG.warn(String.format( - "Can't register allocate latency for %s container " - + "with negative or zero allocation IDs", - req.getExecutionTypeRequest().getExecutionType())); + LOG.warn("Can't register allocate latency for {} container with" + + "less than or equal to 0 allocation IDs", + req.getExecutionTypeRequest().getExecutionType()); } } } @@ -362,23 +338,22 @@ public void setAllocateLatenciesTimestamps(List requests) { /** * Updated the JMX metrics class (ClusterMetrics) with the delta time when * these containers where added. It will correctly identify their - * ExecutionType, provided they have they have allocateId != 0 (DEFAULT) + * ExecutionType, provided they have they have allocateId != 0 (DEFAULT). * * @param response the list of the containers to allocate. */ public void updateAllocateLatencies(List response) { - for (Container container : response) { long allocationRequestId = container.getAllocationRequestId(); + ExecutionType executionType = container.getExecutionType(); // we dont support tracking with negative or zero allocationIds if (allocationRequestId > 0) { long now = System.currentTimeMillis(); - long allocIdTime = - (container.getExecutionType() == ExecutionType.GUARANTEED) ? - getAndRemoveGuaAllocationLatencies(allocationRequestId) : - getAndRemoveOppAllocationLatencies(allocationRequestId); + long allocIdTime = (executionType == ExecutionType.GUARANTEED) ? + getAndRemoveGuaAllocationLatencies(allocationRequestId) : + getAndRemoveOppAllocationLatencies(allocationRequestId); if (allocIdTime != 0) { - if (container.getExecutionType() == ExecutionType.GUARANTEED) { + if (executionType == ExecutionType.GUARANTEED) { ClusterMetrics.getMetrics() .addAllocateGuarLatencyEntry(now - allocIdTime); } else { @@ -386,15 +361,13 @@ public void updateAllocateLatencies(List response) { .addAllocateOppLatencyEntry(now - allocIdTime); } } else { - LOG.error(String.format( - "Can't register allocate latency for %s container %s; allotTime=%d ", - container.getExecutionType(), container.getId(), allocIdTime)); + LOG.error("Can't register allocate latency for {} container {}; " + + "allotTime={}", executionType, container.getId(), allocIdTime); } } else { - LOG.warn(String.format("Cant register promotion latency " - + "for container %s. Either allocationID is less than equal to 0 or " - + "lost the container ID", container.getExecutionType().name(), - container.getId())); + LOG.warn("Cant register promotion latency for {} container {}. Either " + + "allocationID is less than or equal to 0 or container is lost", + executionType, container.getId()); } } } From 15e130381c882a85270bab603227b357ed7cd9ee Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Mon, 13 Dec 2021 14:31:40 +0530 Subject: [PATCH 3/7] null checks --- .../resourcemanager/ApplicationMasterService.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 3ce96218b5b64..02deb6b617c4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -498,12 +499,17 @@ protected RMAppAttemptMetrics getAppAttemptMetrics( if (app == null) { return null; } - RMAppAttempt attempt = app.getAppAttempts().get(appAttemptId); - if (attempt == null) { + + Map attempts = app.getAppAttempts(); + if (attempts == null) { return null; + } else { + RMAppAttempt attempt = attempts.get(appAttemptId); + if (attempt == null) { + return null; + } + return attempt.getRMAppAttemptMetrics(); } - - return attempt.getRMAppAttemptMetrics(); } public void registerAppAttempt(ApplicationAttemptId attemptId) { From a2408fae5e4d7150e554a257f8605b3d44ef69b8 Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Wed, 15 Dec 2021 23:44:45 +0530 Subject: [PATCH 4/7] optimize thread sleep time in UT --- .../TestAllocateLatenciesMetrics.java | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java index 5deec56a2b795..e9f5c594042e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java @@ -102,9 +102,6 @@ public void testGuaranteedLatenciesMetrics() throws Exception { nm1.nodeHeartbeat(oppContainersStatus, true); nm2.nodeHeartbeat(oppContainersStatus, true); - OpportunisticContainerAllocatorAMService amservice = - (OpportunisticContainerAllocatorAMService) rm - .getApplicationMasterService(); MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) .withAppName("app").withUser("user").withAcls(null) @@ -133,20 +130,16 @@ public void testGuaranteedLatenciesMetrics() throws Exception { // make sure the containers get allocated nm1.nodeHeartbeat(true); - Thread.sleep(500); - - for (int i = 0; i < 2; i++) { - allocateResponse.getAllocatedContainers().addAll( - am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers()); + List allocatedContainers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (allocatedContainers.size() != 2) { nm1.nodeHeartbeat(true); - Thread.sleep(500); + allocatedContainers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(100); } - List allocatedContainers = - allocateResponse.getAllocatedContainers(); - Assert.assertEquals(2, allocatedContainers.size()); - Container container = allocatedContainers.get(0); MockNM allocNode = nodes.get(container.getNodeId()); From 3ec05b771c1a922c9e8fec2b56f6cfa6d9d01c7f Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Tue, 11 Jan 2022 19:51:42 +0530 Subject: [PATCH 5/7] Fix UT for potential infinite loop --- .../server/resourcemanager/ApplicationMasterService.java | 7 ++++++- .../resourcemanager/TestAllocateLatenciesMetrics.java | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 02deb6b617c4d..a57d5aac3db3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -495,7 +495,12 @@ protected RMAppAttemptMetrics getAppAttemptMetrics( if (appAttemptId == null) { return null; } - RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + ConcurrentMap apps = this.rmContext.getRMApps(); + ApplicationId appId = appAttemptId.getApplicationId(); + if (appId == null) { + return null; + } + RMApp app = apps.get(appId); if (app == null) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java index e9f5c594042e1..70c22cc12a9b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java @@ -133,7 +133,8 @@ public void testGuaranteedLatenciesMetrics() throws Exception { List allocatedContainers = am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); - while (allocatedContainers.size() != 2) { + int maxRetries = 20; + while (allocatedContainers.size() != 2 && maxRetries-- >= 0) { nm1.nodeHeartbeat(true); allocatedContainers.addAll(am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers()); From 5956c47e4b0270612ccfb7311697c3affdb1e243 Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Tue, 11 Jan 2022 20:21:43 +0530 Subject: [PATCH 6/7] Using waitFor in UT to avoid infinite loop --- .../TestAllocateLatenciesMetrics.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java index 70c22cc12a9b5..55afdbf29ae50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java @@ -44,6 +44,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -51,7 +53,8 @@ import java.util.List; public class TestAllocateLatenciesMetrics { - + private static final Logger LOG = + LoggerFactory.getLogger(TestAllocateLatenciesMetrics.class); private static final int GB = 1024; private MockRM rm; @@ -133,13 +136,20 @@ public void testGuaranteedLatenciesMetrics() throws Exception { List allocatedContainers = am1.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); - int maxRetries = 20; - while (allocatedContainers.size() != 2 && maxRetries-- >= 0) { - nm1.nodeHeartbeat(true); - allocatedContainers.addAll(am1.allocate(new ArrayList(), - new ArrayList()).getAllocatedContainers()); - Thread.sleep(100); - } + GenericTestUtils.waitFor(() -> { + try { + nm1.nodeHeartbeat(true); + allocatedContainers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + if (allocatedContainers.size() < 2) { + return false; + } + return true; + } catch (Exception e) { + LOG.error("Error in allocating container " + e); + } + return false; + }, 100, 2000); Container container = allocatedContainers.get(0); MockNM allocNode = nodes.get(container.getNodeId()); From 5cc0337e345f78101b4849af4d6da94b78f8538d Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Tue, 11 Jan 2022 20:23:29 +0530 Subject: [PATCH 7/7] Using waitFor in UT to avoid infinite loop --- .../server/resourcemanager/TestAllocateLatenciesMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java index 55afdbf29ae50..6d2d98d18f998 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java @@ -147,8 +147,8 @@ public void testGuaranteedLatenciesMetrics() throws Exception { return true; } catch (Exception e) { LOG.error("Error in allocating container " + e); + return false; } - return false; }, 100, 2000); Container container = allocatedContainers.get(0);