diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d5e120695e739..18262d51067f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4061,6 +4061,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000; + public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = + FEDERATION_PREFIX + "state-store.clean-up-retry-count"; + + public static final int DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = 1; + + public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = + FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time"; + + public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME = + TimeUnit.SECONDS.toMillis(1); + public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5132d4199afdf..db3da1755a61d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3727,6 +3727,26 @@ yarnfederation/ + + + The number of retries to clear the app in the FederationStateStore, + the default value is 1, that is, after the app fails to clean up, it will retry the cleanup again. + + yarn.federation.state-store.clean-up-retry-count + 1 + + + + + Clear the sleep time of App retry in FederationStateStore. + When the app fails to clean up, + it will sleep for a period of time and then try to clean up. + The default value is 1s. + + yarn.federation.state-store.clean-up-retry-sleep-time + 1s + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java new file mode 100644 index 0000000000000..634e76896456c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/FederationActionRetry.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.retry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface FederationActionRetry { + + Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class); + + T run() throws Exception; + + default T runWithRetries(int retryCount, long retrySleepTime) throws Exception { + int retry = 0; + while (true) { + try { + return run(); + } catch (Exception e) { + LOG.info("Exception while executing an Federation operation.", e); + if (++retry > retryCount) { + LOG.info("Maxed out Federation retries. Giving up!"); + throw e; + } + LOG.info("Retrying operation on Federation. Retry no. {}", retry); + Thread.sleep(retrySleepTime); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java new file mode 100644 index 0000000000000..5d8477cfe599d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/retry/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Federation Retry Policies. **/ +package org.apache.hadoop.yarn.server.federation.retry; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index f847152c47d76..87596e16ee6e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -28,9 +28,11 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer; import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.PrivilegedEntity; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +116,7 @@ public class RMAppManager implements EventHandler, private boolean nodeLabelsEnabled; private Set exclusiveEnforcedPartitions; private String amDefaultNodeLabel; + private FederationStateStoreService federationStateStoreService; private static final String USER_ID_PREFIX = "userid="; @@ -347,6 +350,7 @@ protected synchronized void checkAppNumCompletedLimit() { + ", removing app " + removeApp.getApplicationId() + " from state store."); rmContext.getStateStore().removeApplication(removeApp); + removeApplicationIdFromStateStore(removeId); completedAppsInStateStore--; } @@ -358,6 +362,7 @@ protected synchronized void checkAppNumCompletedLimit() { + this.maxCompletedAppsInMemory + ", removing app " + removeId + " from memory: "); rmContext.getRMApps().remove(removeId); + removeApplicationIdFromStateStore(removeId); this.applicationACLsManager.removeApplication(removeId); } } @@ -1054,4 +1059,42 @@ private void copyPlacementQueueToSubmissionContext( context.setQueue(placementContext.getQueue()); } } + + @VisibleForTesting + public void setFederationStateStoreService(FederationStateStoreService stateStoreService) { + this.federationStateStoreService = stateStoreService; + } + + /** + * Remove ApplicationId From StateStore. + * + * @param appId appId + */ + private void removeApplicationIdFromStateStore(ApplicationId appId) { + if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != null) { + try { + boolean cleanUpResult = + federationStateStoreService.cleanUpFinishApplicationsWithRetries(appId, true); + if(cleanUpResult){ + LOG.info("applicationId = {} remove from state store success.", appId); + } else { + LOG.warn("applicationId = {} remove from state store failed.", appId); + } + } catch (Exception e) { + LOG.error("applicationId = {} remove from state store error.", appId, e); + } + } + } + + // just test using + @VisibleForTesting + public void checkAppNumCompletedLimit4Test() { + checkAppNumCompletedLimit(); + } + + // just test using + @VisibleForTesting + public void finishApplication4Test(ApplicationId applicationId) { + finishApplication(applicationId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8adcff42a695d..1bcfdbbafa862 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -917,6 +917,7 @@ protected void serviceInit(Configuration configuration) throws Exception { } federationStateStoreService = createFederationStateStoreService(); addIfService(federationStateStoreService); + rmAppManager.setFederationStateStoreService(federationStateStoreService); LOG.info("Initialized Federation membership."); } @@ -996,6 +997,13 @@ protected void serviceStart() throws Exception { RMState state = rmStore.loadState(); recover(state); LOG.info("Recovery ended"); + + // Make sure that the App is cleaned up after the RM memory is restored. + if (HAUtil.isFederationEnabled(conf)) { + federationStateStoreService. + createCleanUpFinishApplicationThread("Recovery"); + } + } catch (Exception e) { // the Exception from loadState() needs to be handled for // HA and we need to give up master status if we got fenced diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 060540d01ee32..1d67af926d43e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -20,8 +20,11 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.List; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; @@ -29,9 +32,11 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; @@ -74,10 +79,12 @@ import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest; import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +109,9 @@ public class FederationStateStoreService extends AbstractService private long heartbeatInterval; private long heartbeatInitialDelay; private RMContext rmContext; + private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread"; + private int cleanUpRetryCountNum; + private long cleanUpRetrySleepTime; public FederationStateStoreService(RMContext rmContext) { super(FederationStateStoreService.class.getName()); @@ -149,6 +159,15 @@ protected void serviceInit(Configuration conf) throws Exception { heartbeatInitialDelay = YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY; } + + cleanUpRetryCountNum = conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT); + + cleanUpRetrySleepTime = conf.getTimeDuration( + YarnConfiguration.FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME, + TimeUnit.MILLISECONDS); + LOG.info("Initialized federation membership service."); super.serviceInit(conf); @@ -378,4 +397,160 @@ public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyReques throws YarnException, IOException { throw new NotImplementedException("Code is not implemented"); } + + /** + * Create a thread that cleans up the app. + * @param stage rm-start/rm-stop. + */ + public void createCleanUpFinishApplicationThread(String stage) { + String threadName = cleanUpThreadNamePrefix + "-" + stage; + Thread finishApplicationThread = new Thread(createCleanUpFinishApplicationThread()); + finishApplicationThread.setName(threadName); + finishApplicationThread.start(); + LOG.info("CleanUpFinishApplicationThread has been started {}.", threadName); + } + + /** + * Create a thread that cleans up the apps. + * + * @return thread object. + */ + private Runnable createCleanUpFinishApplicationThread() { + return () -> { + createCleanUpFinishApplication(); + }; + } + + /** + * cleans up the apps. + */ + private void createCleanUpFinishApplication() { + try { + // Get the current RM's App list based on subClusterId + GetApplicationsHomeSubClusterRequest request = + GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); + GetApplicationsHomeSubClusterResponse response = + getApplicationsHomeSubCluster(request); + List applicationHomeSCs = response.getAppsHomeSubClusters(); + + // Traverse the app list and clean up the app. + long successCleanUpAppCount = 0; + + // Save a local copy of the map so that it won't change with the map + Map rmApps = new HashMap<>(this.rmContext.getRMApps()); + + // Need to make sure there is app list in RM memory. + if (rmApps != null && !rmApps.isEmpty()) { + for (ApplicationHomeSubCluster applicationHomeSC : applicationHomeSCs) { + ApplicationId applicationId = applicationHomeSC.getApplicationId(); + if (!rmApps.containsKey(applicationId)) { + try { + Boolean cleanUpSuccess = cleanUpFinishApplicationsWithRetries(applicationId, false); + if (cleanUpSuccess) { + LOG.info("application = {} has been cleaned up successfully.", applicationId); + successCleanUpAppCount++; + } + } catch (Exception e) { + LOG.error("problem during application = {} cleanup.", applicationId, e); + } + } + } + } + + // print app cleanup log + LOG.info("cleanup finished applications size = {}, number = {} successful cleanup.", + applicationHomeSCs.size(), successCleanUpAppCount); + } catch (Exception e) { + LOG.error("problem during cleanup applications.", e); + } + } + + /** + * Clean up the federation completed Application. + * + * @param appId app id. + * @param isQuery true, need to query from statestore, false not query. + * @throws Exception exception occurs. + * @return true, successfully deleted; false, failed to delete or no need to delete + */ + public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean isQuery) + throws Exception { + + // Generate a request to delete data + DeleteApplicationHomeSubClusterRequest request = + DeleteApplicationHomeSubClusterRequest.newInstance(appId); + + // CleanUp Finish App. + return ((FederationActionRetry) () -> invokeCleanUpFinishApp(appId, isQuery, request)) + .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); + } + + /** + * CleanUp Finish App. + * + * @param applicationId app id. + * @param isQuery true, need to query from statestore, false not query. + * @param delRequest delete Application Request + * @return true, successfully deleted; false, failed to delete or no need to delete + * @throws YarnException + */ + private boolean invokeCleanUpFinishApp(ApplicationId applicationId, boolean isQuery, + DeleteApplicationHomeSubClusterRequest delRequest) throws YarnException { + boolean isAppNeedClean = true; + // If we need to query the StateStore + if (isQuery) { + isAppNeedClean = isApplicationNeedClean(applicationId); + } + // When the App needs to be cleaned up, clean up the App. + if (isAppNeedClean) { + DeleteApplicationHomeSubClusterResponse response = + deleteApplicationHomeSubCluster(delRequest); + if (response != null) { + LOG.info("The applicationId = {} has been successfully cleaned up.", applicationId); + return true; + } + } + return false; + } + + /** + * Used to determine whether the Application is cleaned up. + * + * When the app in the RM is completed, + * the HomeSC corresponding to the app will be queried in the StateStore. + * If the current RM is the HomeSC, the completed app will be cleaned up. + * + * @param applicationId applicationId + * @return true, app needs to be cleaned up; + * false, app doesn't need to be cleaned up. + */ + private boolean isApplicationNeedClean(ApplicationId applicationId) { + GetApplicationHomeSubClusterRequest queryRequest = + GetApplicationHomeSubClusterRequest.newInstance(applicationId); + // Here we need to use try...catch, + // because getApplicationHomeSubCluster may throw not exist exception + try { + GetApplicationHomeSubClusterResponse queryResp = + getApplicationHomeSubCluster(queryRequest); + if (queryResp != null) { + ApplicationHomeSubCluster appHomeSC = queryResp.getApplicationHomeSubCluster(); + SubClusterId homeSubClusterId = appHomeSC.getHomeSubCluster(); + if (!subClusterId.equals(homeSubClusterId)) { + LOG.warn("The homeSubCluster of applicationId = {} belong subCluster = {}, " + + " not belong subCluster = {} and is not allowed to delete.", + applicationId, homeSubClusterId, subClusterId); + return false; + } + } else { + LOG.warn("The applicationId = {} not belong subCluster = {} " + + " and is not allowed to delete.", applicationId, subClusterId); + return false; + } + } catch (Exception e) { + LOG.warn("query applicationId = {} error.", applicationId, e); + return false; + } + return true; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index e8ebdd5bedde5..b8e2ce6ef32d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -20,22 +20,46 @@ import java.io.IOException; import java.io.StringReader; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import javax.xml.bind.JAXBException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.junit.After; import org.junit.Assert; @@ -46,6 +70,8 @@ import com.sun.jersey.api.json.JSONJAXBContext; import com.sun.jersey.api.json.JSONUnmarshaller; +import static org.mockito.Mockito.mock; + /** * Unit tests for FederationStateStoreService. */ @@ -207,4 +233,253 @@ public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Except "Started federation membership heartbeat with interval: 300 and initial delay: 10")); rm.stop(); } + + @Test + public void testCleanUpApplication() throws Exception { + + // set yarn configuration + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + + // set up MockRM + final MockRM rm = new MockRM(conf); + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + rm.start(); + + // init subCluster Heartbeat, + // and check that the subCluster is in a running state + FederationStateStoreService stateStoreService = + rm.getFederationStateStoreService(); + FederationStateStoreHeartbeat storeHeartbeat = + stateStoreService.getStateStoreHeartbeatThread(); + storeHeartbeat.run(); + checkSubClusterInfo(SubClusterState.SC_RUNNING); + + // generate an application and join the [SC-1] cluster + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + addApplication2StateStore(appId, stateStore); + + // make sure the app can be queried in the stateStore + GetApplicationHomeSubClusterRequest appRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubCluster(appRequest); + Assert.assertNotNull(response); + ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster(); + Assert.assertNotNull(appHomeSubCluster); + Assert.assertNotNull(appHomeSubCluster.getApplicationId()); + Assert.assertEquals(appId, appHomeSubCluster.getApplicationId()); + + // clean up the app. + boolean cleanUpResult = + stateStoreService.cleanUpFinishApplicationsWithRetries(appId, true); + Assert.assertTrue(cleanUpResult); + + // after clean, the app can no longer be queried from the stateStore. + LambdaTestUtils.intercept(FederationStateStoreException.class, + "Application " + appId + " does not exist", + () -> stateStore.getApplicationHomeSubCluster(appRequest)); + + } + + @Test + public void testCleanUpApplicationWhenRMStart() throws Exception { + + // We design such a test case. + // Step1. We add app01, app02, app03 to the stateStore, + // But these apps are not in RM's RMContext, they are finished apps + // Step2. We simulate RM startup, there is only app04 in RMContext. + // Step3. We wait for 5 seconds, the automatic cleanup thread should clean up finished apps. + + // set yarn configuration. + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + + // set up MockRM. + MockRM rm = new MockRM(conf); + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + + // generate an [app01] and join the [SC-1] cluster. + List appIds = new ArrayList<>(); + ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1); + addApplication2StateStore(appId01, stateStore); + appIds.add(appId01); + + // generate an [app02] and join the [SC-1] cluster. + ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2); + addApplication2StateStore(appId02, stateStore); + appIds.add(appId02); + + // generate an [app03] and join the [SC-1] cluster. + ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3); + addApplication2StateStore(appId03, stateStore); + appIds.add(appId03); + + // make sure the apps can be queried in the stateStore. + GetApplicationsHomeSubClusterRequest allRequest = + GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); + GetApplicationsHomeSubClusterResponse allResponse = + stateStore.getApplicationsHomeSubCluster(allRequest); + Assert.assertNotNull(allResponse); + List appHomeSCLists = allResponse.getAppsHomeSubClusters(); + Assert.assertNotNull(appHomeSCLists); + Assert.assertEquals(3, appHomeSCLists.size()); + + // app04 exists in both RM memory and stateStore. + ApplicationId appId04 = ApplicationId.newInstance(Time.now(), 4); + addApplication2StateStore(appId04, stateStore); + addApplication2RMAppManager(rm, appId04); + + // start rm. + rm.start(); + + // wait 5s, wait for the thread to finish cleaning up. + GenericTestUtils.waitFor(() -> { + int appsSize = 0; + try { + List subClusters = + getApplicationsFromStateStore(); + Assert.assertNotNull(subClusters); + appsSize = subClusters.size(); + } catch (YarnException e) { + e.printStackTrace(); + } + return (appsSize == 1); + }, 100, 1000 * 5); + + // check the app to make sure the apps(app01,app02,app03) doesn't exist. + for (ApplicationId appId : appIds) { + GetApplicationHomeSubClusterRequest appRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + LambdaTestUtils.intercept(FederationStateStoreException.class, + "Application " + appId + " does not exist", + () -> stateStore.getApplicationHomeSubCluster(appRequest)); + } + + if (rm != null) { + rm.stop(); + rm = null; + } + } + + @Test + public void testCleanUpApplicationWhenRMCompleteOneApp() throws Exception { + + // We design such a test case. + // Step1. We start RM,Set the RM memory to keep a maximum of 1 completed app. + // Step2. Register app[01-03] to RM memory & stateStore. + // Step3. We clean up app01, app02, app03, at this time, + // app01, app02 should be cleaned up from statestore, app03 should remain in statestore. + + // set yarn configuration. + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + + // set up MockRM. + MockRM rm = new MockRM(conf); + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + rm.start(); + + // generate an [app01] and join the [SC-1] cluster. + List appIds = new ArrayList<>(); + ApplicationId appId01 = ApplicationId.newInstance(Time.now(), 1); + addApplication2StateStore(appId01, stateStore); + addApplication2RMAppManager(rm, appId01); + appIds.add(appId01); + + // generate an [app02] and join the [SC-1] cluster. + ApplicationId appId02 = ApplicationId.newInstance(Time.now(), 2); + addApplication2StateStore(appId02, stateStore); + addApplication2RMAppManager(rm, appId02); + appIds.add(appId02); + + // generate an [app03] and join the [SC-1] cluster. + ApplicationId appId03 = ApplicationId.newInstance(Time.now(), 3); + addApplication2StateStore(appId03, stateStore); + addApplication2RMAppManager(rm, appId03); + + // rmAppManager + RMAppManager rmAppManager = rm.getRMAppManager(); + rmAppManager.finishApplication4Test(appId01); + rmAppManager.finishApplication4Test(appId02); + rmAppManager.finishApplication4Test(appId03); + rmAppManager.checkAppNumCompletedLimit4Test(); + + // app01, app02 should be cleaned from statestore + // After the query, it should report the error not exist. + for (ApplicationId appId : appIds) { + GetApplicationHomeSubClusterRequest appRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + LambdaTestUtils.intercept(FederationStateStoreException.class, + "Application " + appId + " does not exist", + () -> stateStore.getApplicationHomeSubCluster(appRequest)); + } + + // app03 should remain in statestore + List appHomeScList = getApplicationsFromStateStore(); + Assert.assertNotNull(appHomeScList); + Assert.assertEquals(1, appHomeScList.size()); + ApplicationHomeSubCluster homeSubCluster = appHomeScList.get(0); + Assert.assertNotNull(homeSubCluster); + Assert.assertEquals(appId03, homeSubCluster.getApplicationId()); + } + + private void addApplication2StateStore(ApplicationId appId, + FederationStateStore fedStateStore) throws YarnException { + ApplicationHomeSubCluster appHomeSC = ApplicationHomeSubCluster.newInstance( + appId, subClusterId); + AddApplicationHomeSubClusterRequest addHomeSCRequest = + AddApplicationHomeSubClusterRequest.newInstance(appHomeSC); + fedStateStore.addApplicationHomeSubCluster(addHomeSCRequest); + } + + private List getApplicationsFromStateStore() throws YarnException { + // make sure the apps can be queried in the stateStore + GetApplicationsHomeSubClusterRequest allRequest = + GetApplicationsHomeSubClusterRequest.newInstance(subClusterId); + GetApplicationsHomeSubClusterResponse allResponse = + stateStore.getApplicationsHomeSubCluster(allRequest); + Assert.assertNotNull(allResponse); + List appHomeSCLists = allResponse.getAppsHomeSubClusters(); + Assert.assertNotNull(appHomeSCLists); + return appHomeSCLists; + } + + private void addApplication2RMAppManager(MockRM rm, ApplicationId appId) { + RMContext rmContext = rm.getRMContext(); + Map rmAppMaps = rmContext.getRMApps(); + String user = MockApps.newUserName(); + String name = MockApps.newAppName(); + String queue = MockApps.newQueue(); + + YarnScheduler scheduler = mock(YarnScheduler.class); + + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + + ApplicationSubmissionContext submissionContext = + new ApplicationSubmissionContextPBImpl(); + + // applicationId will not be used because RMStateStore is mocked, + // but applicationId is still set for safety + submissionContext.setApplicationId(appId); + submissionContext.setPriority(Priority.newInstance(0)); + + RMApp application = new RMAppImpl(appId, rmContext, conf, name, + user, queue, submissionContext, scheduler, masterService, + System.currentTimeMillis(), "YARN", null, + new ArrayList<>()); + + rmAppMaps.putIfAbsent(application.getApplicationId(), application); + } }