diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 28e889b2deb3e..a57d5aac3db3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -24,9 +24,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -395,7 +397,23 @@ public AllocateResponse allocate(AllocateRequest request) ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); + RMAppAttemptMetrics rmMetrics = getAppAttemptMetrics(appAttemptId); + // we do this here to prevent the internal lock in allocate() + if (rmMetrics != null) { + rmMetrics.setAllocateLatenciesTimestamps(request.getAskList()); + } + AllocateResponse response = allocate(request, amrmTokenIdentifier); + if (rmMetrics != null) { + rmMetrics.updateAllocateLatencies(response.getAllocatedContainers()); + } + return response; + } + protected AllocateResponse allocate(AllocateRequest request, + AMRMTokenIdentifier amrmTokenIdentifier) + throws YarnException, IOException { + ApplicationAttemptId appAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); this.amLivelinessMonitor.receivedPing(appAttemptId); /* check if its in cache */ @@ -472,6 +490,33 @@ public AllocateResponse allocate(AllocateRequest request) } } + protected RMAppAttemptMetrics getAppAttemptMetrics( + ApplicationAttemptId appAttemptId) { + if (appAttemptId == null) { + return null; + } + ConcurrentMap apps = this.rmContext.getRMApps(); + ApplicationId appId = appAttemptId.getApplicationId(); + if (appId == null) { + return null; + } + RMApp app = apps.get(appId); + if (app == null) { + return null; + } + + Map attempts = app.getAppAttempts(); + if (attempts == null) { + return null; + } else { + RMAppAttempt attempt = attempts.get(appAttemptId); + if (attempt == null) { + return null; + } + return attempt.getRMAppAttemptMetrics(); + } + } + public void registerAppAttempt(ApplicationAttemptId attemptId) { AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java index c56a1e7b3d92d..15de7661ec27b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMetrics.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.metrics2.MetricsInfo; @@ -76,7 +77,10 @@ public class ClusterMetrics { MutableGaugeInt rmDispatcherEventQueueSize; @Metric("# of scheduler dispatcher event queue size") MutableGaugeInt schedulerDispatcherEventQueueSize; - + @Metric("Allocation Latencies for Guaranteed containers") + MutableQuantiles allocateLatencyGuarQuantiles; + @Metric("Allocation Latencies for Opportunistic containers") + MutableQuantiles allocateLatencyOppQuantiles; private boolean rmEventProcMonitorEnable = false; private static final MetricsInfo RECORD_INFO = info("ClusterMetrics", @@ -117,6 +121,7 @@ public static ClusterMetrics getMetrics() { if(INSTANCE == null){ INSTANCE = new ClusterMetrics(); registerMetrics(); + INSTANCE.initialize(); isInitialized.set(true); } } @@ -124,6 +129,17 @@ public static ClusterMetrics getMetrics() { return INSTANCE; } + private void initialize() { + allocateLatencyGuarQuantiles = registry.newQuantiles( + "AllocateLatencyGuaranteed", + "Latency to fulfill an Allocate(Guaranteed) requests", "ops", + "latency", 5); + allocateLatencyOppQuantiles = registry.newQuantiles( + "AllocateLatencyOpportunistic", + "Latency to fulfill an Allocate(Opportunistic) requests", "ops", + "latency", 5); + } + private static void registerMetrics() { registry = new MetricsRegistry(RECORD_INFO); registry.tag(RECORD_INFO, "ResourceManager"); @@ -357,6 +373,14 @@ public void incrNumContainerAssigned() { numContainersAssigned.incrementAndGet(); } + public void addAllocateGuarLatencyEntry(long processingTime) { + allocateLatencyGuarQuantiles.add(processingTime); + } + + public void addAllocateOppLatencyEntry(long processingTime) { + allocateLatencyOppQuantiles.add(processingTime); + } + private ScheduledThreadPoolExecutor getAssignCounterExecutor(){ return assignCounterExecutor; } @@ -376,4 +400,22 @@ public int getSchedulerEventQueueSize() { public void setSchedulerEventQueueSize(int schedulerEventQueueSize) { this.schedulerDispatcherEventQueueSize.set(schedulerEventQueueSize); } + + public MutableQuantiles getAllocateLatencyGuarQuantiles() { + return allocateLatencyGuarQuantiles; + } + + public void setAllocateLatencyGuarQuantiles( + MutableQuantiles allocateLatencyGuarQuantiles) { + this.allocateLatencyGuarQuantiles = allocateLatencyGuarQuantiles; + } + + public MutableQuantiles getAllocateLatencyOppQuantiles() { + return allocateLatencyOppQuantiles; + } + + public void setAllocateLatencyOppQuantiles( + MutableQuantiles allocateLatencyOppQuantiles) { + this.allocateLatencyOppQuantiles = allocateLatencyOppQuantiles; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java index 3f3d875c4b493..667a55a5a75c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,6 +30,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.lang3.time.DateUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -63,6 +70,11 @@ public class RMAppAttemptMetrics { new int[NodeType.values().length][NodeType.values().length]; private volatile int totalAllocatedContainers; + private ConcurrentHashMap allocationGuaranteedLatencies = + new ConcurrentHashMap(); + private ConcurrentHashMap allocationOpportunisticLatencies = + new ConcurrentHashMap(); + public RMAppAttemptMetrics(ApplicationAttemptId attemptId, RMContext rmContext) { this.attemptId = attemptId; @@ -242,4 +254,121 @@ public Resource getApplicationAttemptHeadroom() { public void setApplicationAttemptHeadRoom(Resource headRoom) { this.applicationHeadroom = headRoom; } + + /** + * Add allocationID latency to the application ID with a specific timestamp + * (guaranteed). + * + * @param allocId allocationId + * @param timestamp the timestamp to associate + */ + public void addAllocationGuarLatencyIfNotExists(long allocId, + long timestamp) { + allocationGuaranteedLatencies.putIfAbsent(allocId, timestamp); + } + + /** + * Add allocationID latency to the application ID with a specific timestamp + * (opportunistic). + * + * @param allocId allocationId + * @param timestamp the timestamp to associate + */ + public void addAllocationOppLatencyIfNotExists(long allocId, long timestamp) { + allocationOpportunisticLatencies.putIfAbsent(allocId, timestamp); + } + + /** + * Returns the time associated when the allocation Id was added. This method + * removes the allocation Id from the class (guaranteed). + * + * @param allocId the allocation ID to get the associated time + * @return the timestamp associated with that allocation id as well as stop + * tracking it + */ + public long getAndRemoveGuaAllocationLatencies(long allocId) { + Long ret = allocationGuaranteedLatencies.remove(allocId); + return ret != null ? ret : 0L; + } + + /** + * Returns the time associated when the allocation Id was added. This method + * removes the allocation Id from the class (opportunistic). + * + * @param allocId the allocation ID to get the associated time + * @return the timestamp associated with that allocation id as well as stop + * tracking it + */ + public long getAndRemoveOppAllocationLatencies(long allocId) { + Long ret = allocationOpportunisticLatencies.remove(allocId); + return ret != null ? ret : 0L; + } + + /** + * Set timestamp for the provided ResourceRequest. It will correctly identify + * their ExecutionType, provided they have they have allocateId != 0 (DEFAULT) + * This is used in conjunction with updatePromoteLatencies method. + * + * @param requests the ResourceRequests to add. + */ + public void setAllocateLatenciesTimestamps(List requests) { + long now = Time.now(); + for (ResourceRequest req : requests) { + if (req.getNumContainers() > 0) { + // we dont support tracking with negative or zero allocationIds + long allocationRequestId = req.getAllocationRequestId(); + if (allocationRequestId > 0) { + ExecutionTypeRequest execReq = req.getExecutionTypeRequest(); + if (execReq != null) { + if (ExecutionType.GUARANTEED.equals(execReq.getExecutionType())) { + addAllocationGuarLatencyIfNotExists(allocationRequestId, now); + } else { + addAllocationOppLatencyIfNotExists(allocationRequestId, now); + } + } + } else { + LOG.warn("Can't register allocate latency for {} container with" + + "less than or equal to 0 allocation IDs", + req.getExecutionTypeRequest().getExecutionType()); + } + } + } + } + + /** + * Updated the JMX metrics class (ClusterMetrics) with the delta time when + * these containers where added. It will correctly identify their + * ExecutionType, provided they have they have allocateId != 0 (DEFAULT). + * + * @param response the list of the containers to allocate. + */ + public void updateAllocateLatencies(List response) { + for (Container container : response) { + long allocationRequestId = container.getAllocationRequestId(); + ExecutionType executionType = container.getExecutionType(); + // we dont support tracking with negative or zero allocationIds + if (allocationRequestId > 0) { + long now = System.currentTimeMillis(); + long allocIdTime = (executionType == ExecutionType.GUARANTEED) ? + getAndRemoveGuaAllocationLatencies(allocationRequestId) : + getAndRemoveOppAllocationLatencies(allocationRequestId); + if (allocIdTime != 0) { + if (executionType == ExecutionType.GUARANTEED) { + ClusterMetrics.getMetrics() + .addAllocateGuarLatencyEntry(now - allocIdTime); + } else { + ClusterMetrics.getMetrics() + .addAllocateOppLatencyEntry(now - allocIdTime); + } + } else { + LOG.error("Can't register allocate latency for {} container {}; " + + "allotTime={}", executionType, container.getId(), allocIdTime); + } + } else { + LOG.warn("Cant register promotion latency for {} container {}. Either " + + "allocationID is less than or equal to 0 or container is lost", + executionType, container.getId()); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java new file mode 100644 index 0000000000000..6d2d98d18f998 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAllocateLatenciesMetrics.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +public class TestAllocateLatenciesMetrics { + private static final Logger LOG = + LoggerFactory.getLogger(TestAllocateLatenciesMetrics.class); + private static final int GB = 1024; + + private MockRM rm; + private DrainDispatcher dispatcher; + + private OpportunisticContainersStatus oppContainersStatus = + getOpportunisticStatus(); + + @Before + public void createAndStartRM() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + startRM(conf); + } + + private void startRM(final YarnConfiguration conf) { + dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + } + + @After + public void stopRM() { + if (rm != null) { + rm.stop(); + } + } + + @Test + public void testGuaranteedLatenciesMetrics() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) + .withAppName("app").withUser("user").withAcls(null) + .withQueue("default").build(); + + RMApp app1 = MockRMAppSubmitter.submit(rm, data); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + ResourceRequest rr1 = ResourceRequest + .newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true)); + ResourceRequest rr2 = ResourceRequest + .newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true)); + rr1.setAllocationRequestId(1); + rr2.setAllocationRequestId(2); + AllocateResponse allocateResponse = + am1.allocate(Arrays.asList(rr1, rr2), null); + + // make sure the containers get allocated + nm1.nodeHeartbeat(true); + List allocatedContainers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + GenericTestUtils.waitFor(() -> { + try { + nm1.nodeHeartbeat(true); + allocatedContainers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + if (allocatedContainers.size() < 2) { + return false; + } + return true; + } catch (Exception e) { + LOG.error("Error in allocating container " + e); + return false; + } + }, 100, 2000); + + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.GUARANTEED, + ContainerState.RUNNING, "", 0)), true); + rm.drainEvents(); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Container Completed in the NM + allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.GUARANTEED, + ContainerState.COMPLETE, "", 0)), true); + Assert.assertTrue( + ClusterMetrics.getMetrics().allocateLatencyGuarQuantiles.changed()); + rm.drainEvents(); + + // Verify that container has been removed.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertNull(rmContainer); + } + + @Test + public void testOpportunisticLatenciesMetrics() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm) + .withAppName("app").withUser("user").withAcls(null) + .withQueue("default").build(); + + RMApp app1 = MockRMAppSubmitter.submit(rm, data); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(oppContainersStatus, true); + nm2.nodeHeartbeat(oppContainersStatus, true); + + GenericTestUtils + .waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, 10, + 10 * 100); + + AllocateResponse allocateResponse = am1.allocate(Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); + + List allocatedContainers = + allocateResponse.getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.RUNNING, "", 0)), true); + rm.drainEvents(); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Container Completed in the NM + allocNode.nodeHeartbeat(Arrays.asList(ContainerStatus + .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, + ContainerState.COMPLETE, "", 0)), true); + Assert.assertTrue( + ClusterMetrics.getMetrics().allocateLatencyOppQuantiles.changed()); + rm.drainEvents(); + + // Verify that container has been removed.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt(container.getId().getApplicationAttemptId()) + .getRMContainer(container.getId()); + Assert.assertNull(rmContainer); + } + + private OpportunisticContainersStatus getOpportunisticStatus() { + return getOpportunisticStatus(-1, 100, 1000); + } + + private OpportunisticContainersStatus getOpportunisticStatus(int waitTime, + int queueLength, int queueCapacity) { + OpportunisticContainersStatus status = + OpportunisticContainersStatus.newInstance(); + status.setEstimatedQueueWaitTime(waitTime); + status.setOpportQueueCapacity(queueCapacity); + status.setWaitQueueLength(queueLength); + return status; + } +} \ No newline at end of file