From 733333eacb8b4892723485511789a5bd12596a5c Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 29 Sep 2022 23:06:24 +0800 Subject: [PATCH 01/14] YARN-11323. [Federation] Improve Router Handler FinishApps. --- .../server/resourcemanager/RMAppManager.java | 26 +++++ .../resourcemanager/ResourceManager.java | 14 +++ .../FederationStateStoreService.java | 103 ++++++++++++++++++ 3 files changed, 143 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index f847152c47d76..b0f9e0bb05203 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -28,9 +28,12 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer; import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.PrivilegedEntity; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +117,7 @@ public class RMAppManager implements EventHandler, private boolean nodeLabelsEnabled; private Set exclusiveEnforcedPartitions; private String amDefaultNodeLabel; + private FederationStateStoreService federationStateStoreService; private static final String USER_ID_PREFIX = "userid="; @@ -347,6 +351,7 @@ protected synchronized void checkAppNumCompletedLimit() { + ", removing app " + removeApp.getApplicationId() + " from state store."); rmContext.getStateStore().removeApplication(removeApp); + removeApplicationIdFromStateStore(removeId); completedAppsInStateStore--; } @@ -358,6 +363,7 @@ protected synchronized void checkAppNumCompletedLimit() { + this.maxCompletedAppsInMemory + ", removing app " + removeId + " from memory: "); rmContext.getRMApps().remove(removeId); + removeApplicationIdFromStateStore(removeId); this.applicationACLsManager.removeApplication(removeId); } } @@ -1054,4 +1060,24 @@ private void copyPlacementQueueToSubmissionContext( context.setQueue(placementContext.getQueue()); } } + + @VisibleForTesting + public void setFederationStateStoreService(FederationStateStoreService stateStoreService) { + this.federationStateStoreService = stateStoreService; + } + + private void removeApplicationIdFromStateStore(ApplicationId applicationId) { + if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != null) { + try { + DeleteApplicationHomeSubClusterResponse response = + federationStateStoreService.cleanUpFinishApplicationsWithRetries(applicationId); + if (response != null) { + LOG.info("applicationId = {} remove from state store success.", + applicationId); + } + } catch (Exception e) { + LOG.error("applicationId = {} remove from state store error.", applicationId, e); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8adcff42a695d..961a291311033 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -917,6 +917,7 @@ protected void serviceInit(Configuration configuration) throws Exception { } federationStateStoreService = createFederationStateStoreService(); addIfService(federationStateStoreService); + rmAppManager.setFederationStateStoreService(federationStateStoreService); LOG.info("Initialized Federation membership."); } @@ -996,6 +997,13 @@ protected void serviceStart() throws Exception { RMState state = rmStore.loadState(); recover(state); LOG.info("Recovery ended"); + + // Make sure that the App is cleaned up after the RM memory is restored. + if (HAUtil.isFederationEnabled(conf)) { + federationStateStoreService. + createCleanUpFinishApplicationThread("Recovery"); + } + } catch (Exception e) { // the Exception from loadState() needs to be handled for // HA and we need to give up master status if we got fenced @@ -1017,6 +1025,12 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { + // If RM is stopped, also perform a cleanup operation. + if (HAUtil.isFederationEnabled(conf)) { + federationStateStoreService. + createCleanUpFinishApplicationThread("Stop"); + } + super.serviceStop(); DefaultMetricsSystem.shutdown(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 060540d01ee32..40fa4ee9c292d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -29,6 +29,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -102,6 +104,7 @@ public class FederationStateStoreService extends AbstractService private long heartbeatInterval; private long heartbeatInitialDelay; private RMContext rmContext; + private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread"; public FederationStateStoreService(RMContext rmContext) { super(FederationStateStoreService.class.getName()); @@ -378,4 +381,104 @@ public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyReques throws YarnException, IOException { throw new NotImplementedException("Code is not implemented"); } + + /** + * Create a thread that cleans up the app. + * @param stage rm-start/rm-stop. + */ + public void createCleanUpFinishApplicationThread(String stage) { + String threadName = cleanUpThreadNamePrefix + "-" + stage; + Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread()); + finishApplicationThread.setName(threadName); + finishApplicationThread.start(); + } + + /** + * Create a thread that cleans up the app. + * + * @return thread object. + */ + private Runnable createCleanUpFinishApplicationThread() { + return () -> { + + try { + // Get the current RM's App list based on subClusterId + GetApplicationsHomeSubClusterRequest request = + GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); + GetApplicationsHomeSubClusterResponse response = + getApplicationsHomeSubCluster(request); + List applications = response.getAppsHomeSubClusters(); + + // Traverse the app list and clean up the app. + long successCleanUpAppCount = 0; + for (ApplicationHomeSubCluster application : applications) { + ApplicationId applicationId = application.getApplicationId(); + if (!this.rmContext.getRMApps().containsKey(applicationId)) { + try { + DeleteApplicationHomeSubClusterResponse deleteResponse = + cleanUpFinishApplicationsWithRetries(applicationId); + if (deleteResponse != null) { + LOG.info("application = {} has been cleaned up successfully.", applicationId); + successCleanUpAppCount++; + } + } catch (YarnException e) { + LOG.error("problem during application = {} cleanup.", applicationId, e); + } + } + } + + // print app cleanup log + LOG.info("cleanup finished applications size = {}, number = {} successful cleanups.", + applications.size(), successCleanUpAppCount); + } catch (Exception e) { + LOG.error("problem during cleanup applications.", e); + } + }; + } + + /** + * Clean up the completed Application. + * + * @param applicationId app id. + * @return DeleteApplicationHomeSubClusterResponse. + * @throws Exception exception occurs. + */ + public DeleteApplicationHomeSubClusterResponse + cleanUpFinishApplicationsWithRetries(ApplicationId applicationId) throws Exception { + DeleteApplicationHomeSubClusterRequest request = + DeleteApplicationHomeSubClusterRequest.newInstance(applicationId); + return new FederationStateStoreAction() { + @Override + public DeleteApplicationHomeSubClusterResponse run() throws Exception { + return deleteApplicationHomeSubCluster(request); + } + }.runWithRetries(); + } + + /** + * Define an abstract class, abstract retry method, + * which can be used for other methods later. + * + * @param abstract parameter + */ + private abstract class FederationStateStoreAction { + abstract T run() throws Exception; + + T runWithRetries() throws Exception { + int retry = 0; + while (true) { + try { + return run(); + } catch (Exception e) { + LOG.info("Exception while executing an FederationStateStore operation.", e); + if (++retry > 10) { + LOG.info("Maxed out FederationStateStore retries. Giving up!"); + throw e; + } + LOG.info("Retrying operation on FederationStateStore. Retry no. " + retry); + Thread.sleep(10); + } + } + } + } } From 4978a7067c9f0391923742237eb9a2af06f56d1d Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 29 Sep 2022 23:06:30 +0800 Subject: [PATCH 02/14] YARN-11323. [Federation] Improve Router Handler FinishApps. --- .../FederationStateStoreService.java | 1 + .../TestFederationRMStateStoreService.java | 54 ++++++++++++++++--- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 40fa4ee9c292d..600452baf487f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.List; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index e8ebdd5bedde5..852c31308cc4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -26,15 +26,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; -import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; -import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; +import org.apache.hadoop.yarn.server.federation.store.records.*; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.junit.After; @@ -207,4 +206,47 @@ public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Except "Started federation membership heartbeat with interval: 300 and initial delay: 10")); rm.stop(); } + + @Test + public void testCleanUpApplication() throws Exception { + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + + final MockRM rm = new MockRM(conf); + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + rm.start(); + + FederationStateStoreService stateStoreService = + rm.getFederationStateStoreService(); + FederationStateStoreHeartbeat storeHeartbeat = + stateStoreService.getStateStoreHeartbeatThread(); + storeHeartbeat.run(); + checkSubClusterInfo(SubClusterState.SC_RUNNING); + + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + ApplicationHomeSubCluster appHomeSubCluster1 = ApplicationHomeSubCluster + .newInstance(appId, subClusterId); + AddApplicationHomeSubClusterRequest request = + AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster1); + stateStore.addApplicationHomeSubCluster(request); + + GetApplicationHomeSubClusterRequest appRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubCluster(appRequest); + Assert.assertNotNull(response); + ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster(); + Assert.assertNotNull(appHomeSubCluster); + Assert.assertNotNull(appHomeSubCluster.getApplicationId()); + Assert.assertEquals(appId, appHomeSubCluster.getApplicationId()); + + DeleteApplicationHomeSubClusterResponse delResponse = + stateStoreService.cleanUpFinishApplicationsWithRetries(appId); + Assert.assertNotNull(delResponse); + LambdaTestUtils.intercept(FederationStateStoreException.class, + "Application " + appId + " does not exist", + () -> stateStore.getApplicationHomeSubCluster(appRequest)); + } } From adfc5ad0b5dc872d73310f45b5a29c02d58569ae Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 29 Sep 2022 23:40:29 +0800 Subject: [PATCH 03/14] YARN-11323. Fix CheckStyle. --- .../TestFederationRMStateStoreService.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index 852c31308cc4c..8722f24fa1e5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -33,7 +33,17 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; -import org.apache.hadoop.yarn.server.federation.store.records.*; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.junit.After; From f98c892764f4241560ea082390a8d1baac643f6b Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 30 Sep 2022 10:51:29 +0800 Subject: [PATCH 04/14] YARN-11323. Fix CheckStyle. --- .../retry/FederationActionRetry.java | 29 +++++ .../server/resourcemanager/RMAppManager.java | 20 ++- .../FederationStateStoreService.java | 122 +++++++++++------- .../TestFederationRMStateStoreService.java | 6 +- 4 files changed, 122 insertions(+), 55 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java new file mode 100644 index 0000000000000..091f6d8bb61a4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.yarn.server.federation.retry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class FederationActionRetry { + + public static final Logger LOG = + LoggerFactory.getLogger(FederationActionRetry.class); + + protected abstract T run() throws Exception; + + public T runWithRetries(int retryCount, long retrySleepTime) throws Exception { + int retry = 0; + while (true) { + try { + return run(); + } catch (Exception e) { + LOG.info("Exception while executing an Federation operation.", e); + if (++retry > retryCount) { + LOG.info("Maxed out Federation retries. Giving up!"); + throw e; + } + LOG.info("Retrying operation on Federation. Retry no. {}", retry); + Thread.sleep(retrySleepTime); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index b0f9e0bb05203..ba7d750f19ba1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -1066,17 +1066,23 @@ public void setFederationStateStoreService(FederationStateStoreService stateStor this.federationStateStoreService = stateStoreService; } - private void removeApplicationIdFromStateStore(ApplicationId applicationId) { + /** + * Remove ApplicationId From StateStore. + * + * @param appId appId + */ + private void removeApplicationIdFromStateStore(ApplicationId appId) { if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != null) { try { - DeleteApplicationHomeSubClusterResponse response = - federationStateStoreService.cleanUpFinishApplicationsWithRetries(applicationId); - if (response != null) { - LOG.info("applicationId = {} remove from state store success.", - applicationId); + boolean cleanUpResult = + federationStateStoreService.cleanUpFinishApplicationsWithRetries(appId, true); + if(cleanUpResult){ + LOG.info("applicationId = {} remove from state store success.", appId); + } else { + LOG.warn("applicationId = {} remove from state store failed.", appId); } } catch (Exception e) { - LOG.error("applicationId = {} remove from state store error.", applicationId, e); + LOG.error("applicationId = {} remove from state store error.", appId, e); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 600452baf487f..0ca7aa78a1367 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.List; @@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; @@ -81,6 +84,7 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +110,8 @@ public class FederationStateStoreService extends AbstractService private long heartbeatInitialDelay; private RMContext rmContext; private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread"; + private int cleanUpRetryNum; + private long cleanUpRetryTime; public FederationStateStoreService(RMContext rmContext) { super(FederationStateStoreService.class.getName()); @@ -412,24 +418,31 @@ private Runnable createCleanUpFinishApplicationThread() { // Traverse the app list and clean up the app. long successCleanUpAppCount = 0; - for (ApplicationHomeSubCluster application : applications) { - ApplicationId applicationId = application.getApplicationId(); - if (!this.rmContext.getRMApps().containsKey(applicationId)) { - try { - DeleteApplicationHomeSubClusterResponse deleteResponse = - cleanUpFinishApplicationsWithRetries(applicationId); - if (deleteResponse != null) { - LOG.info("application = {} has been cleaned up successfully.", applicationId); - successCleanUpAppCount++; + + // Save a local copy of the map so that it won't change with the map + Map rmApps = new HashMap<>(this.rmContext.getRMApps()); + + // Need to make sure there is app list in RM memory. + if (rmApps != null && !rmApps.isEmpty()) { + for (ApplicationHomeSubCluster application : applications) { + ApplicationId applicationId = application.getApplicationId(); + if (!this.rmContext.getRMApps().containsKey(applicationId)) { + try { + Boolean cleanUpSuccess = + cleanUpFinishApplicationsWithRetries(applicationId, false); + if (cleanUpSuccess) { + LOG.info("application = {} has been cleaned up successfully.", applicationId); + successCleanUpAppCount++; + } + } catch (YarnException e) { + LOG.error("problem during application = {} cleanup.", applicationId, e); } - } catch (YarnException e) { - LOG.error("problem during application = {} cleanup.", applicationId, e); } } } // print app cleanup log - LOG.info("cleanup finished applications size = {}, number = {} successful cleanups.", + LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.", applications.size(), successCleanUpAppCount); } catch (Exception e) { LOG.error("problem during cleanup applications.", e); @@ -438,48 +451,67 @@ private Runnable createCleanUpFinishApplicationThread() { } /** - * Clean up the completed Application. + * Clean up the federation completed Application. * * @param applicationId app id. - * @return DeleteApplicationHomeSubClusterResponse. + * @param isQuery true, need to query from statestore ; false not query. * @throws Exception exception occurs. */ - public DeleteApplicationHomeSubClusterResponse - cleanUpFinishApplicationsWithRetries(ApplicationId applicationId) throws Exception { - DeleteApplicationHomeSubClusterRequest request = + public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, boolean isQuery) + throws Exception { + + // Generate a request to delete data + DeleteApplicationHomeSubClusterRequest delRequest = DeleteApplicationHomeSubClusterRequest.newInstance(applicationId); - return new FederationStateStoreAction() { + + return new FederationActionRetry() { @Override - public DeleteApplicationHomeSubClusterResponse run() throws Exception { - return deleteApplicationHomeSubCluster(request); - } - }.runWithRetries(); - } + public Boolean run() throws Exception { + boolean isAppNeedClean = true; + + // If we need to query the StateStore + if (isQuery) { + + GetApplicationHomeSubClusterRequest queryRequest = + GetApplicationHomeSubClusterRequest.newInstance(applicationId); + // Here we need to use try...catch, + // because getApplicationHomeSubCluster may throw not exist exception + try { + GetApplicationHomeSubClusterResponse queryResp = + getApplicationHomeSubCluster(queryRequest); + if (queryResp != null) { + ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster(); + SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster(); + if (!subClusterId.equals(homeSubClusterId)) { + isAppNeedClean = false; + LOG.warn("The homeSubCluster of applicationId = {} is {}, " + + " not belong subCluster = {} and is not allowed to delete.", + applicationId, homeSubClusterId, subClusterId); + } + } else { + isAppNeedClean = false; + LOG.warn("The applicationId = {} not belong subCluster = {} " + + " and is not allowed to delete.", applicationId, subClusterId); + } + } catch (Exception e) { + isAppNeedClean = false; + LOG.warn("query applicationId = {} error.", applicationId, e); + } + } - /** - * Define an abstract class, abstract retry method, - * which can be used for other methods later. - * - * @param abstract parameter - */ - private abstract class FederationStateStoreAction { - abstract T run() throws Exception; - - T runWithRetries() throws Exception { - int retry = 0; - while (true) { - try { - return run(); - } catch (Exception e) { - LOG.info("Exception while executing an FederationStateStore operation.", e); - if (++retry > 10) { - LOG.info("Maxed out FederationStateStore retries. Giving up!"); - throw e; + // When the App needs to be cleaned up, clean up the App. + if (isAppNeedClean) { + DeleteApplicationHomeSubClusterResponse response = + deleteApplicationHomeSubCluster(delRequest); + if (response != null) { + LOG.info("The applicationId ={} has been successfully cleaned up.", + applicationId); + return true; } - LOG.info("Retrying operation on FederationStateStore. Retry no. " + retry); - Thread.sleep(10); } + + return false; } - } + }.runWithRetries(10, 100); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index 8722f24fa1e5f..d9a1e57d01d94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -252,9 +252,9 @@ public void testCleanUpApplication() throws Exception { Assert.assertNotNull(appHomeSubCluster.getApplicationId()); Assert.assertEquals(appId, appHomeSubCluster.getApplicationId()); - DeleteApplicationHomeSubClusterResponse delResponse = - stateStoreService.cleanUpFinishApplicationsWithRetries(appId); - Assert.assertNotNull(delResponse); + boolean cleanUpResult = + stateStoreService.cleanUpFinishApplicationsWithRetries(appId, true); + Assert.assertTrue(cleanUpResult); LambdaTestUtils.intercept(FederationStateStoreException.class, "Application " + appId + " does not exist", () -> stateStore.getApplicationHomeSubCluster(appRequest)); From d03b5fd0ad947ed0a2f5224fce298177375b2bca Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 30 Sep 2022 13:58:54 +0800 Subject: [PATCH 05/14] YARN-11323. Add New Junit Test. --- .../hadoop/yarn/conf/YarnConfiguration.java | 10 ++ .../pb/ApplicationHomeSubClusterPBImpl.java | 3 + .../FederationStateStoreService.java | 31 ++-- .../TestFederationRMStateStoreService.java | 136 +++++++++++++++++- 4 files changed, 165 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d5e120695e739..07a4e6f653b90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4061,6 +4061,16 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000; + public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = + FEDERATION_PREFIX + "state-store.clean-up-retry-count"; + + public static final int DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = 1; + + public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = + FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time"; + + public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = 1000; + public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java index a72a431430d5b..93e1a8a9925e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java @@ -110,6 +110,9 @@ public String toString() { @Override public ApplicationId getApplicationId() { ApplicationHomeSubClusterProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } if (!p.hasApplicationId()) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 0ca7aa78a1367..e4c845df117e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -110,8 +110,8 @@ public class FederationStateStoreService extends AbstractService private long heartbeatInitialDelay; private RMContext rmContext; private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread"; - private int cleanUpRetryNum; - private long cleanUpRetryTime; + private int cleanUpRetryCountNum; + private long cleanUpRetrySleepTime; public FederationStateStoreService(RMContext rmContext) { super(FederationStateStoreService.class.getName()); @@ -159,6 +159,14 @@ protected void serviceInit(Configuration conf) throws Exception { heartbeatInitialDelay = YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY; } + + cleanUpRetryCountNum = conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT); + + cleanUpRetrySleepTime = conf.getLong( + YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME); + LOG.info("Initialized federation membership service."); super.serviceInit(conf); @@ -398,6 +406,7 @@ public void createCleanUpFinishApplicationThread(String stage) { Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread()); finishApplicationThread.setName(threadName); finishApplicationThread.start(); + LOG.info("CleanUpFinishApplicationThread has been started {}.", threadName); } /** @@ -414,7 +423,7 @@ private Runnable createCleanUpFinishApplicationThread() { GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); GetApplicationsHomeSubClusterResponse response = getApplicationsHomeSubCluster(request); - List applications = response.getAppsHomeSubClusters(); + List applicationHomeSCs = response.getAppsHomeSubClusters(); // Traverse the app list and clean up the app. long successCleanUpAppCount = 0; @@ -424,9 +433,9 @@ private Runnable createCleanUpFinishApplicationThread() { // Need to make sure there is app list in RM memory. if (rmApps != null && !rmApps.isEmpty()) { - for (ApplicationHomeSubCluster application : applications) { - ApplicationId applicationId = application.getApplicationId(); - if (!this.rmContext.getRMApps().containsKey(applicationId)) { + for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) { + ApplicationId applicationId = applicationHomeSC.getApplicationId(); + if (!rmApps.containsKey(applicationId)) { try { Boolean cleanUpSuccess = cleanUpFinishApplicationsWithRetries(applicationId, false); @@ -434,7 +443,7 @@ private Runnable createCleanUpFinishApplicationThread() { LOG.info("application = {} has been cleaned up successfully.", applicationId); successCleanUpAppCount++; } - } catch (YarnException e) { + } catch (Exception e) { LOG.error("problem during application = {} cleanup.", applicationId, e); } } @@ -443,7 +452,7 @@ private Runnable createCleanUpFinishApplicationThread() { // print app cleanup log LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.", - applications.size(), successCleanUpAppCount); + applicationHomeSCs.size(), successCleanUpAppCount); } catch (Exception e) { LOG.error("problem during cleanup applications.", e); } @@ -460,6 +469,10 @@ private Runnable createCleanUpFinishApplicationThread() { public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, boolean isQuery) throws Exception { + if(applicationId==null){ + LOG.info("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"); + } + // Generate a request to delete data DeleteApplicationHomeSubClusterRequest delRequest = DeleteApplicationHomeSubClusterRequest.newInstance(applicationId); @@ -512,6 +525,6 @@ public Boolean run() throws Exception { return false; } - }.runWithRetries(10, 100); + }.runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index d9a1e57d01d94..3192ae8ede654 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.io.StringReader; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import javax.xml.bind.JAXBException; @@ -43,8 +46,12 @@ import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; -import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.junit.After; import org.junit.Assert; @@ -55,6 +62,9 @@ import com.sun.jersey.api.json.JSONJAXBContext; import com.sun.jersey.api.json.JSONUnmarshaller; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Unit tests for FederationStateStoreService. */ @@ -219,15 +229,20 @@ public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Except @Test public void testCleanUpApplication() throws Exception { + + // set yarn configuration conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + // set up MockRM final MockRM rm = new MockRM(conf); rm.init(conf); stateStore = rm.getFederationStateStoreService().getStateStoreClient(); rm.start(); + // init subCluster Heartbeat, + // and check that the subCluster is in a running state FederationStateStoreService stateStoreService = rm.getFederationStateStoreService(); FederationStateStoreHeartbeat storeHeartbeat = @@ -235,13 +250,11 @@ public void testCleanUpApplication() throws Exception { storeHeartbeat.run(); checkSubClusterInfo(SubClusterState.SC_RUNNING); + // generate an application and join the [SC-1] cluster ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); - ApplicationHomeSubCluster appHomeSubCluster1 = ApplicationHomeSubCluster - .newInstance(appId, subClusterId); - AddApplicationHomeSubClusterRequest request = - AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster1); - stateStore.addApplicationHomeSubCluster(request); + addApplication2StateStore(appId, stateStore); + // make sure the app can be queried in the stateStore GetApplicationHomeSubClusterRequest appRequest = GetApplicationHomeSubClusterRequest.newInstance(appId); GetApplicationHomeSubClusterResponse response = @@ -252,11 +265,122 @@ public void testCleanUpApplication() throws Exception { Assert.assertNotNull(appHomeSubCluster.getApplicationId()); Assert.assertEquals(appId, appHomeSubCluster.getApplicationId()); + // clean up the app. boolean cleanUpResult = stateStoreService.cleanUpFinishApplicationsWithRetries(appId, true); Assert.assertTrue(cleanUpResult); + + // after clean, the app can no longer be queried from the stateStore. LambdaTestUtils.intercept(FederationStateStoreException.class, "Application " + appId + " does not exist", () -> stateStore.getApplicationHomeSubCluster(appRequest)); + + } + + @Test + public void testCleanUpApplicationWhenRMStart() throws Exception { + + // We design such a test case. + // Step1. We add app01, app02, app03 to the stateStore, + // But these apps are not in RM's RMContext, they are finished apps + // Step2. We simulate RM startup, there is only app04 in RMContext. + // Step3. We wait for 5 seconds, the automatic cleanup thread should clean up finished apps. + + // set yarn configuration. + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + + // set up MockRM. + final MockRM rm = new MockRM(conf); + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + + // generate an [app01] and join the [SC-1] cluster. + List appIds = new ArrayList(); + ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1); + addApplication2StateStore(appId01, stateStore); + appIds.add(appId01); + + // generate an [app02] and join the [SC-1] cluster. + ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2); + addApplication2StateStore(appId02, stateStore); + appIds.add(appId02); + + // generate an [app03] and join the [SC-1] cluster. + ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3); + addApplication2StateStore(appId03, stateStore); + appIds.add(appId03); + + // make sure the apps can be queried in the stateStore. + GetApplicationsHomeSubClusterRequest allRequest = + GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); + GetApplicationsHomeSubClusterResponse allResponse = + stateStore.getApplicationsHomeSubCluster(allRequest); + Assert.assertNotNull(allResponse); + List appHomeSCLists = allResponse.getAppsHomeSubClusters(); + Assert.assertNotNull(appHomeSCLists); + Assert.assertEquals(3, appHomeSCLists.size()); + + // app04 exists in both RM memory and stateStore. + ApplicationId appId04 = ApplicationId.newInstance(Time.now(), 4); + addApplication2StateStore(appId04, stateStore); + RMContext rmContext = rm.getRMContext(); + Map rmAppMaps = rmContext.getRMApps(); + RMAppImpl appId04RMApp = mock(RMAppImpl.class); + when(appId04RMApp.getApplicationId()).thenReturn(appId04); + rmAppMaps.put(appId04, appId04RMApp); + + // start rm. + rm.start(); + + // wait 5s, wait for the thread to finish cleaning up. + GenericTestUtils.waitFor(() -> { + int appsSize = 0; + try { + appsSize = getApplicationsFromStateStore(); + } catch (YarnException e) { + e.printStackTrace(); + } + return (appsSize == 1); + }, 100, 1000 * 5); + + // check the app to make sure the apps(app01,app02,app03) doesn't exist. + for (ApplicationId appId : appIds) { + GetApplicationHomeSubClusterRequest appRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + LambdaTestUtils.intercept(FederationStateStoreException.class, + "Application " + appId + " does not exist", + () -> stateStore.getApplicationHomeSubCluster(appRequest)); + } + } + + @Test + public void testCleanUpApplicationWhenRMStop() throws Exception { + // We design such a test case. + // Step1. We start RM, Register app[01-04] to RM memory, + // There will be 4 apps in memory + } + + private void addApplication2StateStore(ApplicationId appId, + FederationStateStore stateStore) throws YarnException { + ApplicationHomeSubCluster appHomeSC = ApplicationHomeSubCluster.newInstance( + appId, subClusterId); + AddApplicationHomeSubClusterRequest addHomeSCRequest = + AddApplicationHomeSubClusterRequest.newInstance(appHomeSC); + stateStore.addApplicationHomeSubCluster(addHomeSCRequest); + } + + private int getApplicationsFromStateStore() throws YarnException { + // make sure the apps can be queried in the stateStore + GetApplicationsHomeSubClusterRequest allRequest = + GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); + GetApplicationsHomeSubClusterResponse allResponse = + stateStore.getApplicationsHomeSubCluster(allRequest); + Assert.assertNotNull(allResponse); + List appHomeSCLists = allResponse.getAppsHomeSubClusters(); + Assert.assertNotNull(appHomeSCLists); + return appHomeSCLists.size(); } } From b8c6a3cd5e08532aa058472c632d6914c63a0554 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 30 Sep 2022 15:47:28 +0800 Subject: [PATCH 06/14] YARN-11323. Fix CheckStyle. --- .../server/resourcemanager/RMAppManager.java | 12 ++ .../resourcemanager/ResourceManager.java | 6 - .../FederationStateStoreService.java | 82 +++++------ .../TestFederationRMStateStoreService.java | 130 ++++++++++++++++-- 4 files changed, 169 insertions(+), 61 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index ba7d750f19ba1..6f7de28b2e305 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -1086,4 +1086,16 @@ private void removeApplicationIdFromStateStore(ApplicationId appId) { } } } + + // just test using + @VisibleForTesting + public void checkAppNumCompletedLimit4Test() { + checkAppNumCompletedLimit(); + } + + // just test using + @VisibleForTesting + public void finishApplication4Test(ApplicationId applicationId) { + finishApplication(applicationId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 961a291311033..1bcfdbbafa862 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1025,12 +1025,6 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { - // If RM is stopped, also perform a cleanup operation. - if (HAUtil.isFederationEnabled(conf)) { - federationStateStoreService. - createCleanUpFinishApplicationThread("Stop"); - } - super.serviceStop(); DefaultMetricsSystem.shutdown(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index e4c845df117e4..43c71b83006a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -410,53 +410,59 @@ public void createCleanUpFinishApplicationThread(String stage) { } /** - * Create a thread that cleans up the app. + * Create a thread that cleans up the apps. * * @return thread object. */ private Runnable createCleanUpFinishApplicationThread() { return () -> { + createCleanUpFinishApplication(); + }; + } - try { - // Get the current RM's App list based on subClusterId - GetApplicationsHomeSubClusterRequest request = - GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); - GetApplicationsHomeSubClusterResponse response = - getApplicationsHomeSubCluster(request); - List applicationHomeSCs = response.getAppsHomeSubClusters(); - - // Traverse the app list and clean up the app. - long successCleanUpAppCount = 0; - - // Save a local copy of the map so that it won't change with the map - Map rmApps = new HashMap<>(this.rmContext.getRMApps()); - - // Need to make sure there is app list in RM memory. - if (rmApps != null && !rmApps.isEmpty()) { - for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) { - ApplicationId applicationId = applicationHomeSC.getApplicationId(); - if (!rmApps.containsKey(applicationId)) { - try { - Boolean cleanUpSuccess = - cleanUpFinishApplicationsWithRetries(applicationId, false); - if (cleanUpSuccess) { - LOG.info("application = {} has been cleaned up successfully.", applicationId); - successCleanUpAppCount++; - } - } catch (Exception e) { - LOG.error("problem during application = {} cleanup.", applicationId, e); + /** + * cleans up the apps. + */ + private void createCleanUpFinishApplication() { + try { + // Get the current RM's App list based on subClusterId + GetApplicationsHomeSubClusterRequest request = + GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); + GetApplicationsHomeSubClusterResponse response = + getApplicationsHomeSubCluster(request); + List applicationHomeSCs = response.getAppsHomeSubClusters(); + + // Traverse the app list and clean up the app. + long successCleanUpAppCount = 0; + + // Save a local copy of the map so that it won't change with the map + Map rmApps = new HashMap<>(this.rmContext.getRMApps()); + + // Need to make sure there is app list in RM memory. + if (rmApps != null && !rmApps.isEmpty()) { + for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) { + ApplicationId applicationId = applicationHomeSC.getApplicationId(); + if (!rmApps.containsKey(applicationId)) { + try { + Boolean cleanUpSuccess = + cleanUpFinishApplicationsWithRetries(applicationId, false); + if (cleanUpSuccess) { + LOG.info("application = {} has been cleaned up successfully.", applicationId); + successCleanUpAppCount++; } + } catch (Exception e) { + LOG.error("problem during application = {} cleanup.", applicationId, e); } } } - - // print app cleanup log - LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.", - applicationHomeSCs.size(), successCleanUpAppCount); - } catch (Exception e) { - LOG.error("problem during cleanup applications.", e); } - }; + + // print app cleanup log + LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.", + applicationHomeSCs.size(), successCleanUpAppCount); + } catch (Exception e) { + LOG.error("problem during cleanup applications.", e); + } } /** @@ -469,10 +475,6 @@ private Runnable createCleanUpFinishApplicationThread() { public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, boolean isQuery) throws Exception { - if(applicationId==null){ - LOG.info("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"); - } - // Generate a request to delete data DeleteApplicationHomeSubClusterRequest delRequest = DeleteApplicationHomeSubClusterRequest.newInstance(applicationId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index 3192ae8ede654..09039c63b8fad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -31,7 +31,11 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -48,10 +52,14 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.junit.After; import org.junit.Assert; @@ -63,7 +71,6 @@ import com.sun.jersey.api.json.JSONUnmarshaller; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Unit tests for FederationStateStoreService. @@ -293,7 +300,7 @@ public void testCleanUpApplicationWhenRMStart() throws Exception { conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); // set up MockRM. - final MockRM rm = new MockRM(conf); + MockRM rm = new MockRM(conf); rm.init(conf); stateStore = rm.getFederationStateStoreService().getStateStoreClient(); @@ -326,11 +333,7 @@ public void testCleanUpApplicationWhenRMStart() throws Exception { // app04 exists in both RM memory and stateStore. ApplicationId appId04 = ApplicationId.newInstance(Time.now(), 4); addApplication2StateStore(appId04, stateStore); - RMContext rmContext = rm.getRMContext(); - Map rmAppMaps = rmContext.getRMApps(); - RMAppImpl appId04RMApp = mock(RMAppImpl.class); - when(appId04RMApp.getApplicationId()).thenReturn(appId04); - rmAppMaps.put(appId04, appId04RMApp); + addApplication2RMAppManager(rm, appId04); // start rm. rm.start(); @@ -339,7 +342,10 @@ public void testCleanUpApplicationWhenRMStart() throws Exception { GenericTestUtils.waitFor(() -> { int appsSize = 0; try { - appsSize = getApplicationsFromStateStore(); + List subClusters = + getApplicationsFromStateStore(); + Assert.assertNotNull(subClusters); + appsSize = subClusters.size(); } catch (YarnException e) { e.printStackTrace(); } @@ -354,25 +360,91 @@ public void testCleanUpApplicationWhenRMStart() throws Exception { "Application " + appId + " does not exist", () -> stateStore.getApplicationHomeSubCluster(appRequest)); } + + if (rm != null) { + rm.stop(); + rm = null; + } } @Test - public void testCleanUpApplicationWhenRMStop() throws Exception { + public void testCleanUpApplicationWhenRMCompleteOneApp() throws Exception { + // We design such a test case. - // Step1. We start RM, Register app[01-04] to RM memory, - // There will be 4 apps in memory + // Step1. We start RM,Set the RM memory to keep a maximum of 1 completed app. + // Step2. Register app[01-03] to RM memory & stateStore. + // Step3. We clean up app01, app02, app03, at this time, + // app01, app02 should be cleaned up from statestore, app03 should remain in statestore. + + // set yarn configuration. + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + + // set up MockRM. + MockRM rm = new MockRM(conf); + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + rm.start(); + + // generate an [app01] and join the [SC-1] cluster. + List appIds = new ArrayList(); + ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1); + addApplication2StateStore(appId01, stateStore); + addApplication2RMAppManager(rm, appId01); + appIds.add(appId01); + + // generate an [app02] and join the [SC-1] cluster. + ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2); + addApplication2StateStore(appId02, stateStore); + addApplication2RMAppManager(rm, appId02); + appIds.add(appId02); + + // generate an [app03] and join the [SC-1] cluster. + ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3); + addApplication2StateStore(appId03, stateStore); + addApplication2RMAppManager(rm, appId03); + + + // rmAppManager + RMAppManager rmAppManager = rm.getRMAppManager(); + rmAppManager.finishApplication4Test(appId01); + rmAppManager.finishApplication4Test(appId02); + rmAppManager.finishApplication4Test(appId03); + rmAppManager.checkAppNumCompletedLimit4Test(); + + // app01, app02 should be cleaned from statestore + // After the query, it should report the error not exist. + for (ApplicationId appId : appIds) { + GetApplicationHomeSubClusterRequest appRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + LambdaTestUtils.intercept(FederationStateStoreException.class, + "Application " + appId + " does not exist", + () -> stateStore.getApplicationHomeSubCluster(appRequest)); + } + + // app03 should remain in statestore + List appHomeScList = getApplicationsFromStateStore(); + Assert.assertNotNull(appHomeScList); + Assert.assertEquals(1, appHomeScList.size()); + ApplicationHomeSubCluster homeSubCluster = appHomeScList.get(0); + Assert.assertNotNull(homeSubCluster); + Assert.assertEquals(appId03, homeSubCluster.getApplicationId()); } private void addApplication2StateStore(ApplicationId appId, FederationStateStore stateStore) throws YarnException { ApplicationHomeSubCluster appHomeSC = ApplicationHomeSubCluster.newInstance( - appId, subClusterId); + appId, subClusterId); AddApplicationHomeSubClusterRequest addHomeSCRequest = - AddApplicationHomeSubClusterRequest.newInstance(appHomeSC); + AddApplicationHomeSubClusterRequest.newInstance(appHomeSC); stateStore.addApplicationHomeSubCluster(addHomeSCRequest); } - private int getApplicationsFromStateStore() throws YarnException { + private List getApplicationsFromStateStore() throws YarnException { // make sure the apps can be queried in the stateStore GetApplicationsHomeSubClusterRequest allRequest = GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); @@ -381,6 +453,34 @@ private int getApplicationsFromStateStore() throws YarnException { Assert.assertNotNull(allResponse); List appHomeSCLists = allResponse.getAppsHomeSubClusters(); Assert.assertNotNull(appHomeSCLists); - return appHomeSCLists.size(); + return appHomeSCLists; + } + + private void addApplication2RMAppManager(MockRM rm, ApplicationId appId) { + RMContext rmContext = rm.getRMContext(); + Map rmAppMaps = rmContext.getRMApps(); + String user = MockApps.newUserName(); + String name = MockApps.newAppName(); + String queue = MockApps.newQueue(); + + YarnScheduler scheduler = mock(YarnScheduler.class); + + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + + ApplicationSubmissionContext submissionContext = + new ApplicationSubmissionContextPBImpl(); + + // applicationId will not be used because RMStateStore is mocked, + // but applicationId is still set for safety + submissionContext.setApplicationId(appId); + submissionContext.setPriority(Priority.newInstance(0)); + + RMApp application = new RMAppImpl(appId, rmContext, conf, name, + user, queue, submissionContext, scheduler, masterService, + System.currentTimeMillis(), "YARN", null, + new ArrayList<>()); + + rmAppMaps.putIfAbsent(application.getApplicationId(), application); } } From 570bc57e19d9bd4288893783eee1e89d537461fd Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 30 Sep 2022 15:57:43 +0800 Subject: [PATCH 07/14] YARN-11323. Fix CheckStyle. --- .../retry/FederationActionRetry.java | 17 +++++++++++++++++ .../server/federation/retry/package-info.java | 19 +++++++++++++++++++ .../server/resourcemanager/RMAppManager.java | 1 - 3 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java index 091f6d8bb61a4..00506821a7433 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + package org.apache.hadoop.yarn.server.federation.retry; import org.slf4j.Logger; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java new file mode 100644 index 0000000000000..5d8477cfe599d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Federation Retry Policies. **/ +package org.apache.hadoop.yarn.server.federation.retry; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 6f7de28b2e305..87596e16ee6e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer; import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.PrivilegedEntity; -import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath; import org.slf4j.Logger; From 2869e1c218099d50fb4c14f34f73d04ae5f5b7ea Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 30 Sep 2022 17:32:15 +0800 Subject: [PATCH 08/14] YARN-11323. Add Yarn Configuration. --- .../src/main/resources/yarn-default.xml | 20 +++++++++++++++++++ .../FederationStateStoreService.java | 5 +++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5132d4199afdf..726ee77eb1e30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3727,6 +3727,26 @@ yarnfederation/ + + + The number of retries to clear the app in the FederationStateStore, + the default value is 1, that is, after the app fails to clean up, it will retry the cleanup again. + + yarn.federation.state-store.clean-up-retry-count + 1 + + + + + Clear the sleep time of App retry in FederationStateStore. + When the app fails to clean up, + it will sleep for a period of time and then try to clean up. + The default value is 1000ms. + + yarn.federation.state-store.clean-up-retry-sleep-time + 1000ms + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 43c71b83006a0..52a595fa303ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -163,9 +163,10 @@ protected void serviceInit(Configuration conf) throws Exception { cleanUpRetryCountNum = conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT, YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT); - cleanUpRetrySleepTime = conf.getLong( + cleanUpRetrySleepTime = conf.getTimeDuration( YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME, - YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME); + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME, + TimeUnit.MILLISECONDS); LOG.info("Initialized federation membership service."); From 512cd4fa250bf8ca0c2852c5f8ca3b216dffaf8e Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sat, 1 Oct 2022 11:27:50 +0800 Subject: [PATCH 09/14] YARN-11323. Fix CheckStyle. --- .../hadoop/yarn/conf/YarnConfiguration.java | 3 +- .../src/main/resources/yarn-default.xml | 4 +- .../FederationStateStoreService.java | 60 ++++++++++--------- .../TestFederationRMStateStoreService.java | 8 +-- 4 files changed, 41 insertions(+), 34 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 07a4e6f653b90..18262d51067f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4069,7 +4069,8 @@ public static boolean isAclEnabled(Configuration conf) { public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time"; - public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = 1000; + public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = + TimeUnit.SECONDS.toMillis(1); public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 726ee77eb1e30..db3da1755a61d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3741,10 +3741,10 @@ Clear the sleep time of App retry in FederationStateStore. When the app fails to clean up, it will sleep for a period of time and then try to clean up. - The default value is 1000ms. + The default value is 1s. yarn.federation.state-store.clean-up-retry-sleep-time - 1000ms + 1s diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 52a595fa303ab..c0cb1d02f1109 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -470,8 +470,9 @@ private void createCleanUpFinishApplication() { * Clean up the federation completed Application. * * @param applicationId app id. - * @param isQuery true, need to query from statestore ; false not query. + * @param isQuery true, need to query from statestore, false not query. * @throws Exception exception occurs. + * @return true, successfully deleted; false, failed to delete or no need to delete */ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, boolean isQuery) throws Exception { @@ -487,32 +488,7 @@ public Boolean run() throws Exception { // If we need to query the StateStore if (isQuery) { - - GetApplicationHomeSubClusterRequest queryRequest = - GetApplicationHomeSubClusterRequest.newInstance(applicationId); - // Here we need to use try...catch, - // because getApplicationHomeSubCluster may throw not exist exception - try { - GetApplicationHomeSubClusterResponse queryResp = - getApplicationHomeSubCluster(queryRequest); - if (queryResp != null) { - ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster(); - SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster(); - if (!subClusterId.equals(homeSubClusterId)) { - isAppNeedClean = false; - LOG.warn("The homeSubCluster of applicationId = {} is {}, " + - " not belong subCluster = {} and is not allowed to delete.", - applicationId, homeSubClusterId, subClusterId); - } - } else { - isAppNeedClean = false; - LOG.warn("The applicationId = {} not belong subCluster = {} " + - " and is not allowed to delete.", applicationId, subClusterId); - } - } catch (Exception e) { - isAppNeedClean = false; - LOG.warn("query applicationId = {} error.", applicationId, e); - } + isAppNeedClean = isApplicationNeedClean(applicationId); } // When the App needs to be cleaned up, clean up the App. @@ -530,4 +506,34 @@ public Boolean run() throws Exception { } }.runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); } + + private boolean isApplicationNeedClean(ApplicationId applicationId) { + GetApplicationHomeSubClusterRequest queryRequest = + GetApplicationHomeSubClusterRequest.newInstance(applicationId); + // Here we need to use try...catch, + // because getApplicationHomeSubCluster may throw not exist exception + try { + GetApplicationHomeSubClusterResponse queryResp = + getApplicationHomeSubCluster(queryRequest); + if (queryResp != null) { + ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster(); + SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster(); + if (!subClusterId.equals(homeSubClusterId)) { + LOG.warn("The homeSubCluster of applicationId = {} belong subCluster = {}, " + + " not belong subCluster = {} and is not allowed to delete.", + applicationId, homeSubClusterId, subClusterId); + return false; + } + } else { + LOG.warn("The applicationId = {} not belong subCluster = {} " + + " and is not allowed to delete.", applicationId, subClusterId); + return false; + } + } catch (Exception e) { + LOG.warn("query applicationId = {} error.", applicationId, e); + return false; + } + return true; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index 09039c63b8fad..e232719d361b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -305,7 +305,7 @@ public void testCleanUpApplicationWhenRMStart() throws Exception { stateStore = rm.getFederationStateStoreService().getStateStoreClient(); // generate an [app01] and join the [SC-1] cluster. - List appIds = new ArrayList(); + List appIds = new ArrayList<>(); ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1); addApplication2StateStore(appId01, stateStore); appIds.add(appId01); @@ -391,7 +391,7 @@ public void testCleanUpApplicationWhenRMCompleteOneApp() throws Exception { rm.start(); // generate an [app01] and join the [SC-1] cluster. - List appIds = new ArrayList(); + List appIds = new ArrayList<>(); ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1); addApplication2StateStore(appId01, stateStore); addApplication2RMAppManager(rm, appId01); @@ -436,12 +436,12 @@ public void testCleanUpApplicationWhenRMCompleteOneApp() throws Exception { } private void addApplication2StateStore(ApplicationId appId, - FederationStateStore stateStore) throws YarnException { + FederationStateStore fedStateStore) throws YarnException { ApplicationHomeSubCluster appHomeSC = ApplicationHomeSubCluster.newInstance( appId, subClusterId); AddApplicationHomeSubClusterRequest addHomeSCRequest = AddApplicationHomeSubClusterRequest.newInstance(appHomeSC); - stateStore.addApplicationHomeSubCluster(addHomeSCRequest); + fedStateStore.addApplicationHomeSubCluster(addHomeSCRequest); } private List getApplicationsFromStateStore() throws YarnException { From 7b287bc6ef5458dbd7c848cedba3e0edaaac00b7 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sat, 1 Oct 2022 02:58:39 -0700 Subject: [PATCH 10/14] YARN-11323. Fix CheckStyle. --- .../retry/FederationActionRetry.java | 9 ++-- .../FederationStateStoreService.java | 51 +++++++++++-------- .../TestFederationRMStateStoreService.java | 1 - 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java index 00506821a7433..634e76896456c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java @@ -20,14 +20,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class FederationActionRetry { +public interface FederationActionRetry { - public static final Logger LOG = - LoggerFactory.getLogger(FederationActionRetry.class); + Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class); - protected abstract T run() throws Exception; + T run() throws Exception; - public T runWithRetries(int retryCount, long retrySleepTime) throws Exception { + default T runWithRetries(int retryCount, long retrySleepTime) throws Exception { int retry = 0; while (true) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index c0cb1d02f1109..b4da7727f1682 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -481,32 +481,39 @@ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, DeleteApplicationHomeSubClusterRequest delRequest = DeleteApplicationHomeSubClusterRequest.newInstance(applicationId); - return new FederationActionRetry() { - @Override - public Boolean run() throws Exception { - boolean isAppNeedClean = true; - - // If we need to query the StateStore - if (isQuery) { - isAppNeedClean = isApplicationNeedClean(applicationId); - } + return ((FederationActionRetry) () -> { - // When the App needs to be cleaned up, clean up the App. - if (isAppNeedClean) { - DeleteApplicationHomeSubClusterResponse response = - deleteApplicationHomeSubCluster(delRequest); - if (response != null) { - LOG.info("The applicationId ={} has been successfully cleaned up.", - applicationId); - return true; - } - } + boolean isAppNeedClean = true; - return false; + // If we need to query the StateStore + if (isQuery) { + isAppNeedClean = isApplicationNeedClean(applicationId); + } + + // When the App needs to be cleaned up, clean up the App. + if (isAppNeedClean) { + DeleteApplicationHomeSubClusterResponse response = + deleteApplicationHomeSubCluster(delRequest); + if (response != null) { + LOG.info("The applicationId ={} has been successfully cleaned up.", applicationId); + return true; + } } - }.runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + return false; + }).runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); } + /** + * Used to determine whether the Application is cleaned up. + * + * When the app in the RM is completed, + * the HomeSC corresponding to the app will be queried in the StateStore. + * If the current RM is the HomeSC, the completed app will be cleaned up. + * + * @param applicationId applicationId + * @return true, app needs to be cleaned up; + * false, app doesn't need to be cleaned up. + */ private boolean isApplicationNeedClean(ApplicationId applicationId) { GetApplicationHomeSubClusterRequest queryRequest = GetApplicationHomeSubClusterRequest.newInstance(applicationId); @@ -514,7 +521,7 @@ private boolean isApplicationNeedClean(ApplicationId applicationId) { // because getApplicationHomeSubCluster may throw not exist exception try { GetApplicationHomeSubClusterResponse queryResp = - getApplicationHomeSubCluster(queryRequest); + getApplicationHomeSubCluster(queryRequest); if (queryResp != null) { ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster(); SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index e232719d361b6..b8e2ce6ef32d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -408,7 +408,6 @@ public void testCleanUpApplicationWhenRMCompleteOneApp() throws Exception { addApplication2StateStore(appId03, stateStore); addApplication2RMAppManager(rm, appId03); - // rmAppManager RMAppManager rmAppManager = rm.getRMAppManager(); rmAppManager.finishApplication4Test(appId01); From a42bc43d2fa3f827e5340f555ed06e7d8094c7da Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 2 Oct 2022 16:35:31 +0800 Subject: [PATCH 11/14] YARN-11323. Fix CheckStyle. --- .../FederationStateStoreService.java | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index b4da7727f1682..991971e9b25a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -482,25 +482,22 @@ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, DeleteApplicationHomeSubClusterRequest.newInstance(applicationId); return ((FederationActionRetry) () -> { - - boolean isAppNeedClean = true; - - // If we need to query the StateStore - if (isQuery) { - isAppNeedClean = isApplicationNeedClean(applicationId); - } - - // When the App needs to be cleaned up, clean up the App. - if (isAppNeedClean) { - DeleteApplicationHomeSubClusterResponse response = - deleteApplicationHomeSubCluster(delRequest); - if (response != null) { - LOG.info("The applicationId ={} has been successfully cleaned up.", applicationId); - return true; + boolean isAppNeedClean = true; + // If we need to query the StateStore + if (isQuery) { + isAppNeedClean = isApplicationNeedClean(applicationId); } - } - return false; - }).runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + // When the App needs to be cleaned up, clean up the App. + if (isAppNeedClean) { + DeleteApplicationHomeSubClusterResponse response = + deleteApplicationHomeSubCluster(delRequest); + if (response != null) { + LOG.info("The applicationId ={} has been successfully cleaned up.", applicationId); + return true; + } + } + return false; + }).runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); } /** From 5198d2e8f9cc3b5e06e05c0f13587ca130fbac14 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 4 Oct 2022 14:47:03 +0800 Subject: [PATCH 12/14] YARN-11323. Fix CheckStyle. --- .../FederationStateStoreService.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 991971e9b25a6..c903e5ddef221 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -482,22 +482,23 @@ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, DeleteApplicationHomeSubClusterRequest.newInstance(applicationId); return ((FederationActionRetry) () -> { - boolean isAppNeedClean = true; - // If we need to query the StateStore - if (isQuery) { - isAppNeedClean = isApplicationNeedClean(applicationId); - } - // When the App needs to be cleaned up, clean up the App. - if (isAppNeedClean) { - DeleteApplicationHomeSubClusterResponse response = - deleteApplicationHomeSubCluster(delRequest); - if (response != null) { - LOG.info("The applicationId ={} has been successfully cleaned up.", applicationId); - return true; - } + boolean isAppNeedClean = true; + // If we need to query the StateStore + if (isQuery) { + isAppNeedClean = isApplicationNeedClean(applicationId); + } + // When the App needs to be cleaned up, clean up the App. + if (isAppNeedClean) { + DeleteApplicationHomeSubClusterResponse response = + deleteApplicationHomeSubCluster(delRequest); + if (response != null) { + LOG.info("The applicationId ={} has been successfully cleaned up.", applicationId); + return true; } - return false; - }).runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + } + return false; + }).runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + } /** From fec1fa4fa26f5e12bb4f98d13d1e5634f3fa3709 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 6 Oct 2022 15:03:31 +0800 Subject: [PATCH 13/14] YARN-11323. Fix CheckStyle. --- .../FederationStateStoreService.java | 62 +++++++++++-------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index c903e5ddef221..ac018b42238a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -445,8 +445,7 @@ private void createCleanUpFinishApplication() { ApplicationId applicationId = applicationHomeSC.getApplicationId(); if (!rmApps.containsKey(applicationId)) { try { - Boolean cleanUpSuccess = - cleanUpFinishApplicationsWithRetries(applicationId, false); + Boolean cleanUpSuccess = cleanUpFinishApplicationsWithRetries(applicationId, false); if (cleanUpSuccess) { LOG.info("application = {} has been cleaned up successfully.", applicationId); successCleanUpAppCount++; @@ -460,7 +459,7 @@ private void createCleanUpFinishApplication() { // print app cleanup log LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.", - applicationHomeSCs.size(), successCleanUpAppCount); + applicationHomeSCs.size(), successCleanUpAppCount); } catch (Exception e) { LOG.error("problem during cleanup applications.", e); } @@ -469,36 +468,49 @@ private void createCleanUpFinishApplication() { /** * Clean up the federation completed Application. * - * @param applicationId app id. + * @param appId app id. * @param isQuery true, need to query from statestore, false not query. * @throws Exception exception occurs. * @return true, successfully deleted; false, failed to delete or no need to delete */ - public boolean cleanUpFinishApplicationsWithRetries(ApplicationId applicationId, boolean isQuery) + public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean isQuery) throws Exception { // Generate a request to delete data - DeleteApplicationHomeSubClusterRequest delRequest = - DeleteApplicationHomeSubClusterRequest.newInstance(applicationId); - - return ((FederationActionRetry) () -> { - boolean isAppNeedClean = true; - // If we need to query the StateStore - if (isQuery) { - isAppNeedClean = isApplicationNeedClean(applicationId); - } - // When the App needs to be cleaned up, clean up the App. - if (isAppNeedClean) { - DeleteApplicationHomeSubClusterResponse response = - deleteApplicationHomeSubCluster(delRequest); - if (response != null) { - LOG.info("The applicationId ={} has been successfully cleaned up.", applicationId); - return true; - } - } - return false; - }).runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + DeleteApplicationHomeSubClusterRequest request = + DeleteApplicationHomeSubClusterRequest.newInstance(appId); + + // CleanUp Finish App. + return ((FederationActionRetry) () -> invokeCleanUpFinishApp(appId, isQuery, request)) + .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + } + /** + * CleanUp Finish App. + * + * @param applicationId app id. + * @param isQuery true, need to query from statestore, false not query. + * @param delRequest delete Application Request + * @return true, successfully deleted; false, failed to delete or no need to delete + * @throws YarnException + */ + private boolean invokeCleanUpFinishApp(ApplicationId applicationId, boolean isQuery, + DeleteApplicationHomeSubClusterRequest delRequest) throws YarnException { + boolean isAppNeedClean = true; + // If we need to query the StateStore + if (isQuery) { + isAppNeedClean = isApplicationNeedClean(applicationId); + } + // When the App needs to be cleaned up, clean up the App. + if (isAppNeedClean) { + DeleteApplicationHomeSubClusterResponse response = + deleteApplicationHomeSubCluster(delRequest); + if (response != null) { + LOG.info("The applicationId = {} has been successfully cleaned up.", applicationId); + return true; + } + } + return false; } /** From bf052fe9ae88c63da863bd1dc7ee8e037c6606ab Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 12 Oct 2022 00:54:27 +0800 Subject: [PATCH 14/14] YARN-11323. Fix CheckStyle. --- .../federation/FederationStateStoreService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index ac018b42238a3..1d67af926d43e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -428,9 +428,9 @@ private void createCleanUpFinishApplication() { try { // Get the current RM's App list based on subClusterId GetApplicationsHomeSubClusterRequest request = - GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); + GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); GetApplicationsHomeSubClusterResponse response = - getApplicationsHomeSubCluster(request); + getApplicationsHomeSubCluster(request); List applicationHomeSCs = response.getAppsHomeSubClusters(); // Traverse the app list and clean up the app.