From 76b8b7f6dcd88cf8665314cf317e01a47786430b Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Tue, 14 Jun 2022 15:50:49 +0800 Subject: [PATCH 1/5] YARN-11127. Potential deadlock in AsyncDispatcher caused by RMNodeImpl, SchedulerApplicationAttempt and RMAppImpl's lock contention. --- .../api/impl/TestRMDeadLockTriggerByApp.java | 356 ++++++++++++++++++ .../hadoop/yarn/event/AsyncDispatcher.java | 10 + .../resourcemanager/ResourceManager.java | 2 +- .../resourcemanager/rmapp/RMAppEventType.java | 1 + .../resourcemanager/rmapp/RMAppImpl.java | 35 ++ .../rmapp/RMAppLogAggregationStatusEvent.java | 44 +++ .../resourcemanager/rmnode/RMNodeImpl.java | 5 +- .../SchedulerApplicationAttempt.java | 8 +- .../common/fica/FiCaSchedulerApp.java | 7 +- .../TestRMAppLogAggregationStatus.java | 27 +- 10 files changed, 477 insertions(+), 18 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java new file mode 100644 index 0000000000000..338f33610f393 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestRMDeadLockTriggerByApp { + + private static final Logger LOG = LoggerFactory + .getLogger(TestRMDeadLockTriggerByApp.class); + + private static final int INTERVAL = 1; + private static final int LOOP = 5000; + private static final float CHECK_DEAD_LOCK_RATIO = 2.0f; + private static final int NODE_COUNT = 1; + + private boolean deadLock = false; + private String errString = null; + + private Configuration conf = null; + private MiniYARNCluster yarnCluster = null; + + private List nodeReports = null; + private ApplicationId appId = null; + private ApplicationAttemptId attemptId = null; + + private YarnClient yarnClient = null; + private AMRMClient amClient = null; + + private ResourceManager rm; + private NodeManager nm; + + // thread for allocate container + private Thread allocateThread = new AllocateTread(); + + // thread for add log aggregation report + private Thread addLogAggReportThread = new AddLogAggregationReportThread(); + + // thread for get application report + private Thread getAppReportThread = new GetApplicationReportThread(); + + @Before + public void setup() throws Exception { + createClusterAndStartApplication(); + } + + void createClusterAndStartApplication() + throws Exception { + this.conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + conf.setInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.setInt(YarnConfiguration.NM_VCORES, LOOP); + conf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, 512 * LOOP); + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, this.INTERVAL); + + this.yarnCluster = new MiniYARNCluster( + TestAMRMClient.class.getName(), NODE_COUNT, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + + // start rm client + this.yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + + // get node info + assertTrue("All node managers did not connect to the RM within the " + + "allotted 5-second timeout", + yarnCluster.waitForNodeManagersToConnect(5000L)); + this.nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); + assertEquals("Not all node managers were reported running", + NODE_COUNT, nodeReports.size()); + + // get rm and nm info + this.rm = yarnCluster.getResourceManager(0); + this.nm = yarnCluster.getNodeManager(0); + + // submit new app + ApplicationSubmissionContext appContext = + yarnClient.createApplication().getApplicationSubmissionContext(); + this.appId = appContext.getApplicationId(); + // set the application name + appContext.setApplicationName("Test"); + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(0); + appContext.setPriority(pri); + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue("default"); + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = + BuilderUtils.newContainerLaunchContext( + Collections.emptyMap(), + new HashMap(), Arrays.asList("sleep", "100"), + new HashMap(), null, + new HashMap()); + appContext.setAMContainerSpec(amContainer); + appContext.setResource(Resource.newInstance(1024, 1)); + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = Records + .newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + // Submit the application to the applications manager + yarnClient.submitApplication(appContext); + + // wait for app to start + GenericTestUtils.waitFor(() -> { + try { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + if (appReport.getYarnApplicationState() + == YarnApplicationState.ACCEPTED) { + this.attemptId = appReport.getCurrentApplicationAttemptId(); + RMAppAttempt appAttempt = rm.getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + return true; + } + } + } catch (Exception e) { + fail("Application launch failed."); + } + return false; + }, 1000, 10000); + + // Just dig into the ResourceManager and get the AMRMToken just for the sake + // of testing. + UserGroupInformation.setLoginUser(UserGroupInformation + .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + + // emulate RM setup of AMRM token in credentials by adding the token + // *before* setting the token service + RMAppAttempt appAttempt = rm.getRMContext().getRMApps() + .get(attemptId.getApplicationId()).getCurrentAppAttempt(); + UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); + appAttempt.getAMRMToken().setService( + ClientRMProxy.getAMRMTokenService(conf)); + + // create AMRMClient + this.amClient = AMRMClient.createAMRMClient(); + amClient.init(conf); + amClient.start(); + amClient.registerApplicationMaster("Host", 10000, ""); + } + + @After + public void teardown() throws YarnException, IOException { + if (allocateThread != null) { + allocateThread.interrupt(); + allocateThread = null; + } + if (addLogAggReportThread != null) { + addLogAggReportThread.interrupt(); + addLogAggReportThread = null; + } + if (getAppReportThread != null) { + getAppReportThread.interrupt(); + getAppReportThread = null; + } + + if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) { + yarnClient.stop(); + } + this.yarnClient = null; + + if (amClient != null && amClient.getServiceState() == STATE.STARTED) { + amClient.stop(); + } + this.amClient = null; + + // Avoid the EventHandlingThread stuck forever + if (deadLock) { + LOG.info("Found dead lock, stop EventHandlingThread manually!"); + ((AsyncDispatcher) rm.getRMContext().getDispatcher()) + .forceEventHandlingThreadStop(); + } + if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) { + yarnCluster.stop(); + } + this.yarnCluster = null; + } + + @Test(timeout = 60000) + public void TestRMDeadLockTriggerByApp() throws InterruptedException { + // start all thread + allocateThread.start(); + addLogAggReportThread.start(); + getAppReportThread.start(); + + this.deadLock = checkAsyncDispatcherDeadLock(); + Assert.assertFalse("There is dead lock!", deadLock); + Assert.assertNull(errString); + } + + private boolean checkAsyncDispatcherDeadLock() throws InterruptedException { + Event lastEvent = null; + Event currentEvent; + int counter = 0; + for (int i = 0; i < LOOP * CHECK_DEAD_LOCK_RATIO; i++) { + currentEvent = ((AsyncDispatcher) rm.getRmDispatcher()).getHeadEvent(); + if (currentEvent != null && (currentEvent == lastEvent)) { + if (counter++ > LOOP * CHECK_DEAD_LOCK_RATIO / 2) { + return true; + } + } else { + counter = 0; + lastEvent = currentEvent; + } + Thread.sleep(INTERVAL); + } + return false; + } + + class AllocateTread extends Thread { + + @Override + public void run() { + ContainerId amContainerId = rm.getRMContext().getRMApps().get(appId) + .getAppAttempts().get(attemptId).getMasterContainer().getId(); + ContainerRequest request = setupContainerAskForRM(); + try { + for (int i = 0; i < LOOP; i++) { + amClient.addContainerRequest(request); + for (ContainerId containerId : nm.getNMContext().getContainers() + .keySet()) { + // release all container except am container + if (!amContainerId.equals(containerId)) { + amClient.releaseAssignedContainer(containerId); + } + } + amClient.allocate(0.1f); + Thread.sleep(INTERVAL); + } + } catch (Throwable t) { + errString = t.getMessage(); + return; + } + } + } + + class AddLogAggregationReportThread extends Thread { + + @Override + public void run() { + LogAggregationReport report = LogAggregationReport + .newInstance(appId, LogAggregationStatus.RUNNING, ""); + try { + for (int i = 0; i < LOOP; i++) { + if (nm.getNMContext().getLogAggregationStatusForApps().size() == 0) { + nm.getNMContext().getLogAggregationStatusForApps().add(report); + } + Thread.sleep(INTERVAL); + } + } catch (Throwable t) { + errString = t.getMessage(); + return; + } + } + } + + class GetApplicationReportThread extends Thread { + + @Override + public void run() { + try { + for (int i = 0; i < LOOP; i++) { + yarnClient.getApplicationReport(appId); + Thread.sleep(INTERVAL); + } + } catch (Throwable t) { + errString = t.getMessage(); + return; + } + } + } + + private ContainerRequest setupContainerAskForRM() { + Priority pri = Priority.newInstance(1); + ContainerRequest request = new ContainerRequest( + Resource.newInstance(512, 1), null, null, pri, 0, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true), ""); + return request; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 1c4ed24b47d78..efa86560c211b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -413,4 +413,14 @@ public void addMetrics(EventTypeMetrics metrics, public int getEventQueueSize() { return eventQueue.size(); } + + @VisibleForTesting + public Event getHeadEvent() { + return eventQueue.peek(); + } + + @VisibleForTesting + public void forceEventHandlingThreadStop() { + eventHandlingThread.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8adcff42a695d..4ee969ca175ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -262,7 +262,7 @@ protected static void setClusterTimeStamp(long timestamp) { } @VisibleForTesting - Dispatcher getRmDispatcher() { + public Dispatcher getRmDispatcher() { return rmDispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 4b55d389540cd..d8383763bd424 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -41,6 +41,7 @@ public enum RMAppEventType { // Source: Container and ResourceTracker APP_RUNNING_ON_NODE, + APP_LOG_AGG_STATUS_UPDATE, // Source: RMStateStore APP_NEW_SAVED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index ca88b8be3281c..51fe20df565e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -279,6 +279,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.APP_LOG_AGG_STATUS_UPDATE, + new AppLogAggregationStatusTransition()) // Handle AppAttemptLaunch to update the launchTime and publish to ATS .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.ATTEMPT_LAUNCHED, @@ -298,6 +301,9 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.APP_LOG_AGG_STATUS_UPDATE, + new AppLogAggregationStatusTransition()) .addTransition(RMAppState.RUNNING, EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), RMAppEventType.ATTEMPT_FAILED, @@ -316,6 +322,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.APP_LOG_AGG_STATUS_UPDATE, + new AppLogAggregationStatusTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, @@ -327,6 +336,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + RMAppEventType.APP_LOG_AGG_STATUS_UPDATE, + new AppLogAggregationStatusTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, @@ -338,6 +350,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) .addTransition(RMAppState.KILLING, RMAppState.KILLING, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + RMAppEventType.APP_LOG_AGG_STATUS_UPDATE, + new AppLogAggregationStatusTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -367,6 +382,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, + RMAppEventType.APP_LOG_AGG_STATUS_UPDATE, + new AppLogAggregationStatusTransition()) .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, EnumSet.of( RMAppEventType.NODE_UPDATE, @@ -379,6 +397,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FAILED, RMAppState.FAILED, + RMAppEventType.APP_LOG_AGG_STATUS_UPDATE, + new AppLogAggregationStatusTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) @@ -387,6 +408,9 @@ RMAppEventType.KILL, new KillAttemptTransition()) .addTransition(RMAppState.KILLED, RMAppState.KILLED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) + .addTransition(RMAppState.KILLED, RMAppState.KILLED, + RMAppEventType.APP_LOG_AGG_STATUS_UPDATE, + new AppLogAggregationStatusTransition()) .addTransition( RMAppState.KILLED, RMAppState.KILLED, @@ -1093,6 +1117,17 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } + private static final class AppLogAggregationStatusTransition extends + RMAppTransition { + + @Override + public void transition(RMAppImpl app, RMAppEvent event) { + RMAppLogAggregationStatusEvent logEvent = + (RMAppLogAggregationStatusEvent) event; + app.aggregateLogReport(logEvent.getNodeId(), logEvent.getReport()); + } + } + // synchronously recover attempt to ensure any incoming external events // to be processed after the attempt processes the recover event. private void recoverAppAttempts() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusEvent.java new file mode 100644 index 0000000000000..b6162fc5efaf1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppLogAggregationStatusEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; + +public class RMAppLogAggregationStatusEvent extends RMAppEvent { + + private final NodeId node; + private final LogAggregationReport report; + + public RMAppLogAggregationStatusEvent(ApplicationId appId, NodeId node, + LogAggregationReport report) { + super(appId, RMAppEventType.APP_LOG_AGG_STATUS_UPDATE); + this.node = node; + this.report = report; + } + + public NodeId getNodeId() { + return node; + } + + public LogAggregationReport getReport() { + return report; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b8aaea5de330c..6bbd4f1baf460 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -37,6 +37,7 @@ import org.apache.commons.collections.keyvalue.DefaultMapEntry; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLogAggregationStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1703,7 +1704,9 @@ private void handleLogAggregationStatus( for (LogAggregationReport report : logAggregationReportsForApps) { RMApp rmApp = this.context.getRMApps().get(report.getApplicationId()); if (rmApp != null) { - ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report); + this.context.getDispatcher().getEventHandler().handle( + new RMAppLogAggregationStatusEvent(rmApp.getApplicationId(), + this.nodeId, report)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 2c84d399242a9..6131a87bf3146 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -110,8 +110,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { FastDateFormat.getInstance("EEE MMM dd HH:mm:ss Z yyyy"); private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000; - protected long lastMemoryAggregateAllocationUpdateTime = 0; - private Map lastResourceSecondsMap = new HashMap<>(); + protected volatile long lastMemoryAggregateAllocationUpdateTime = 0; + private volatile Map lastResourceSecondsMap = new HashMap<>(); protected final AppSchedulingInfo appSchedulingInfo; protected ApplicationAttemptId attemptId; protected Map liveContainers = @@ -1130,7 +1130,7 @@ private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { } public ApplicationResourceUsageReport getResourceUsageReport() { - writeLock.lock(); + readLock.lock(); try { AggregateAppResourceUsage runningResourceUsage = getRunningAggregateAppResourceUsage(); @@ -1166,7 +1166,7 @@ public ApplicationResourceUsageReport getResourceUsageReport() { runningResourceUsage.getResourceUsageSecondsMap(), queueUsagePerc, clusterUsagePerc, preemptedResourceSecondsMaps); } finally { - writeLock.unlock(); + readLock.unlock(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 3a0fd347e5a0b..9504256bfc2a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -1105,11 +1105,8 @@ public AppPlacementAllocator getAppPlacementAllocator( */ @Override public ApplicationResourceUsageReport getResourceUsageReport() { - writeLock.lock(); + readLock.lock(); try { - // Use write lock here because - // SchedulerApplicationAttempt#getResourceUsageReport updated fields - // TODO: improve this ApplicationResourceUsageReport report = super.getResourceUsageReport(); Resource cluster = rmContext.getScheduler().getClusterResource(); Resource totalPartitionRes = @@ -1129,7 +1126,7 @@ public ApplicationResourceUsageReport getResourceUsageReport() { } return report; } finally { - writeLock.unlock(); + readLock.unlock(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 6836288ed1cd1..9f6afdf3a854a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppLogAggregationStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; @@ -177,6 +178,7 @@ public void testLogAggregationStatus() throws Exception { NodeHealthStatus.newInstance(true, null, 0), null, null, null); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp)); + rmApp.handle(new RMAppLogAggregationStatusEvent(appId, nodeId1, report1)); List node2ReportForApp = new ArrayList(); @@ -191,6 +193,7 @@ public void testLogAggregationStatus() throws Exception { NodeHealthStatus.newInstance(true, null, 0), null, null, null); node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, node2ReportForApp)); + rmApp.handle(new RMAppLogAggregationStatusEvent(appId, nodeId2, report2)); // node1 and node2 has updated its log aggregation status // verify that the log aggregation status for node1, node2 // has been changed @@ -228,6 +231,7 @@ public void testLogAggregationStatus() throws Exception { node1ReportForApp2.add(report1_2); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp2)); + rmApp.handle(new RMAppLogAggregationStatusEvent(appId, nodeId1, report1_2)); // verify that the log aggregation status for node1 // has been changed @@ -283,19 +287,26 @@ public void testLogAggregationStatus() throws Exception { // be changed from TIME_OUT to SUCCEEDED List node1ReportForApp3 = new ArrayList(); - LogAggregationReport report1_3; + LogAggregationReport[] report1_3 = new LogAggregationReport[10]; for (int i = 0; i < 10 ; i ++) { - report1_3 = - LogAggregationReport.newInstance(appId, - LogAggregationStatus.RUNNING, "test_message_" + i); - node1ReportForApp3.add(report1_3); + report1_3[i] = LogAggregationReport + .newInstance(appId, LogAggregationStatus.RUNNING, + "test_message_" + i); + node1ReportForApp3.add(report1_3[i]); } - node1ReportForApp3.add(LogAggregationReport.newInstance(appId, - LogAggregationStatus.SUCCEEDED, "")); + LogAggregationReport report1_3_s = LogAggregationReport.newInstance(appId, + LogAggregationStatus.SUCCEEDED, ""); + node1ReportForApp3.add(report1_3_s); // For every logAggregationReport cached in memory, we can only save at most // 10 diagnostic messages/failure messages node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp3)); + for (int i = 0; i < 10; i++) { + rmApp.handle( + new RMAppLogAggregationStatusEvent(appId, nodeId1, report1_3[i])); + } + rmApp.handle( + new RMAppLogAggregationStatusEvent(appId, nodeId1, report1_3_s)); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); Assert.assertEquals(2, logAggregationStatus.size()); @@ -340,6 +351,8 @@ public void testLogAggregationStatus() throws Exception { node2ReportForApp2.add(report2_3); node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, node2ReportForApp2)); + rmApp.handle(new RMAppLogAggregationStatusEvent(appId, nodeId2, report2_2)); + rmApp.handle(new RMAppLogAggregationStatusEvent(appId, nodeId2, report2_3)); Assert.assertEquals(LogAggregationStatus.FAILED, rmApp.getLogAggregationStatusForAppReport()); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); From b920abe417e1cf023665f25131392722599106d0 Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Mon, 20 Jun 2022 17:36:42 +0800 Subject: [PATCH 2/5] trigger ci From 186d14dd7aa137a1ca79f6201d6f878c162a8226 Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Tue, 21 Jun 2022 11:16:52 +0800 Subject: [PATCH 3/5] remove forceEventHandlingThreadStop and fix code style --- .../client/api/impl/TestRMDeadLockTriggerByApp.java | 11 +---------- .../org/apache/hadoop/yarn/event/AsyncDispatcher.java | 5 ----- .../server/resourcemanager/rmnode/RMNodeImpl.java | 1 - 3 files changed, 1 insertion(+), 16 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java index 338f33610f393..54fce969df940 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java @@ -75,9 +75,6 @@ public class TestRMDeadLockTriggerByApp { - private static final Logger LOG = LoggerFactory - .getLogger(TestRMDeadLockTriggerByApp.class); - private static final int INTERVAL = 1; private static final int LOOP = 5000; private static final float CHECK_DEAD_LOCK_RATIO = 2.0f; @@ -240,12 +237,6 @@ public void teardown() throws YarnException, IOException { } this.amClient = null; - // Avoid the EventHandlingThread stuck forever - if (deadLock) { - LOG.info("Found dead lock, stop EventHandlingThread manually!"); - ((AsyncDispatcher) rm.getRMContext().getDispatcher()) - .forceEventHandlingThreadStop(); - } if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) { yarnCluster.stop(); } @@ -253,7 +244,7 @@ public void teardown() throws YarnException, IOException { } @Test(timeout = 60000) - public void TestRMDeadLockTriggerByApp() throws InterruptedException { + public void testRMDeadLockTriggerByApp() throws InterruptedException { // start all thread allocateThread.start(); addLogAggReportThread.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index efa86560c211b..177da0d65032f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -418,9 +418,4 @@ public int getEventQueueSize() { public Event getHeadEvent() { return eventQueue.peek(); } - - @VisibleForTesting - public void forceEventHandlingThreadStop() { - eventHandlingThread.stop(); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 6bbd4f1baf460..3eaba754bd129 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -78,7 +78,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo; From 61e2f31ea276f7b1f0c578d9080da0aabdc72560 Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Tue, 21 Jun 2022 19:25:57 +0800 Subject: [PATCH 4/5] suppress unchecked warnings --- .../apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java index ce9af23744fec..ea78b598581a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailoverProxyProvider.java @@ -78,6 +78,7 @@ public void setUp() throws IOException, YarnException { * gets called. */ @Test + @SuppressWarnings("unchecked") public void testFailoverChange() throws Exception { //Adjusting the YARN Conf conf.set(YarnConfiguration.RM_HA_IDS, "rm0, rm1"); @@ -190,6 +191,7 @@ public void testFailoverChange() throws Exception { * gets called. */ @Test + @SuppressWarnings("unchecked") public void testAutoRefreshFailoverChange() throws Exception { conf.setClass(YarnConfiguration.CLIENT_FAILOVER_NO_HA_PROXY_PROVIDER, AutoRefreshRMFailoverProxyProvider.class, From 860e018d242a92034f49bafa2b2c8601460d8cbf Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Fri, 15 Jul 2022 19:07:59 +0800 Subject: [PATCH 5/5] fix codestyle --- .../api/impl/TestRMDeadLockTriggerByApp.java | 28 ++++++++----------- .../resourcemanager/rmapp/RMAppImpl.java | 3 +- .../resourcemanager/rmnode/RMNodeImpl.java | 3 +- .../TestRMAppLogAggregationStatus.java | 9 ++---- 4 files changed, 16 insertions(+), 27 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java index 54fce969df940..473fba944f95d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestRMDeadLockTriggerByApp.java @@ -116,8 +116,7 @@ void createClusterAndStartApplication() conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName()); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); - conf.setInt( - YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); + conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); conf.setInt(YarnConfiguration.NM_VCORES, LOOP); conf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, 512 * LOOP); @@ -134,12 +133,10 @@ void createClusterAndStartApplication() yarnClient.start(); // get node info - assertTrue("All node managers did not connect to the RM within the " - + "allotted 5-second timeout", + assertTrue("All node managers did not connect to the RM within the allotted 5-second timeout", yarnCluster.waitForNodeManagersToConnect(5000L)); this.nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); - assertEquals("Not all node managers were reported running", - NODE_COUNT, nodeReports.size()); + assertEquals("Not all node managers were reported running", NODE_COUNT, nodeReports.size()); // get rm and nm info this.rm = yarnCluster.getResourceManager(0); @@ -177,8 +174,7 @@ void createClusterAndStartApplication() GenericTestUtils.waitFor(() -> { try { ApplicationReport appReport = yarnClient.getApplicationReport(appId); - if (appReport.getYarnApplicationState() - == YarnApplicationState.ACCEPTED) { + if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { this.attemptId = appReport.getCurrentApplicationAttemptId(); RMAppAttempt appAttempt = rm.getRMContext().getRMApps() .get(attemptId.getApplicationId()).getCurrentAppAttempt(); @@ -194,16 +190,15 @@ void createClusterAndStartApplication() // Just dig into the ResourceManager and get the AMRMToken just for the sake // of testing. - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + UserGroupInformation.setLoginUser( + UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); // emulate RM setup of AMRM token in credentials by adding the token // *before* setting the token service RMAppAttempt appAttempt = rm.getRMContext().getRMApps() .get(attemptId.getApplicationId()).getCurrentAppAttempt(); UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); - appAttempt.getAMRMToken().setService( - ClientRMProxy.getAMRMTokenService(conf)); + appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf)); // create AMRMClient this.amClient = AMRMClient.createAMRMClient(); @@ -284,8 +279,7 @@ public void run() { try { for (int i = 0; i < LOOP; i++) { amClient.addContainerRequest(request); - for (ContainerId containerId : nm.getNMContext().getContainers() - .keySet()) { + for (ContainerId containerId : nm.getNMContext().getContainers().keySet()) { // release all container except am container if (!amContainerId.equals(containerId)) { amClient.releaseAssignedContainer(containerId); @@ -339,9 +333,9 @@ public void run() { private ContainerRequest setupContainerAskForRM() { Priority pri = Priority.newInstance(1); - ContainerRequest request = new ContainerRequest( - Resource.newInstance(512, 1), null, null, pri, 0, true, null, - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true), ""); + ContainerRequest request = + new ContainerRequest(Resource.newInstance(512, 1), null, null, pri, 0, true, null, + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true), ""); return request; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 51fe20df565e8..163ca2ff7312a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1122,8 +1122,7 @@ private static final class AppLogAggregationStatusTransition extends @Override public void transition(RMAppImpl app, RMAppEvent event) { - RMAppLogAggregationStatusEvent logEvent = - (RMAppLogAggregationStatusEvent) event; + RMAppLogAggregationStatusEvent logEvent = (RMAppLogAggregationStatusEvent) event; app.aggregateLogReport(logEvent.getNodeId(), logEvent.getReport()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3eaba754bd129..a3f89285fb56a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1704,8 +1704,7 @@ private void handleLogAggregationStatus( RMApp rmApp = this.context.getRMApps().get(report.getApplicationId()); if (rmApp != null) { this.context.getDispatcher().getEventHandler().handle( - new RMAppLogAggregationStatusEvent(rmApp.getApplicationId(), - this.nodeId, report)); + new RMAppLogAggregationStatusEvent(rmApp.getApplicationId(), this.nodeId, report)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 9f6afdf3a854a..788fbd67ce736 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -290,8 +290,7 @@ public void testLogAggregationStatus() throws Exception { LogAggregationReport[] report1_3 = new LogAggregationReport[10]; for (int i = 0; i < 10 ; i ++) { report1_3[i] = LogAggregationReport - .newInstance(appId, LogAggregationStatus.RUNNING, - "test_message_" + i); + .newInstance(appId, LogAggregationStatus.RUNNING, "test_message_" + i); node1ReportForApp3.add(report1_3[i]); } LogAggregationReport report1_3_s = LogAggregationReport.newInstance(appId, @@ -302,11 +301,9 @@ public void testLogAggregationStatus() throws Exception { node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, node1ReportForApp3)); for (int i = 0; i < 10; i++) { - rmApp.handle( - new RMAppLogAggregationStatusEvent(appId, nodeId1, report1_3[i])); + rmApp.handle(new RMAppLogAggregationStatusEvent(appId, nodeId1, report1_3[i])); } - rmApp.handle( - new RMAppLogAggregationStatusEvent(appId, nodeId1, report1_3_s)); + rmApp.handle(new RMAppLogAggregationStatusEvent(appId, nodeId1, report1_3_s)); logAggregationStatus = rmApp.getLogAggregationReportsForApp(); Assert.assertEquals(2, logAggregationStatus.size());