From 51eed17c5dbbd1654612000ffbab48919dea2f6c Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Fri, 17 Jun 2022 17:07:42 +0800 Subject: [PATCH 1/4] YARN-11183. Federation: Remove outdated ApplicationHomeSubCluster in federation state store. --- .../MySQL/FederationStateStoreStoredProcs.sql | 5 +- .../FederationStateStoreStoreProcs.sql | 2 + .../hadoop/yarn/event/AsyncDispatcher.java | 3 +- .../hadoop/yarn/event/DrainDispatcher.java | 2 +- .../impl/MemoryFederationStateStore.java | 7 +- .../store/impl/SQLFederationStateStore.java | 6 +- .../impl/ZookeeperFederationStateStore.java | 9 +- .../GetApplicationsHomeSubClusterRequest.java | 18 ++ .../pb/ApplicationHomeSubClusterPBImpl.java | 3 + ...plicationsHomeSubClusterRequestPBImpl.java | 61 +++++++ .../proto/yarn_server_federation_protos.proto | 2 +- .../impl/FederationStateStoreBaseTest.java | 12 ++ .../impl/HSQLDBFederationStateStore.java | 7 +- .../RMActiveServiceContext.java | 15 ++ .../server/resourcemanager/RMAppManager.java | 7 + .../server/resourcemanager/RMContext.java | 3 + .../server/resourcemanager/RMContextImpl.java | 6 + .../resourcemanager/ResourceManager.java | 9 + .../federation/FederationStateStoreEvent.java | 28 +++ .../FederationStateStoreEventType.java | 23 +++ ...tateStoreRemoveAppHomeSubClusterEvent.java | 36 ++++ .../FederationStateStoreService.java | 171 +++++++++++++++++- .../TestFederationRMStateStoreService.java | 86 ++++++++- 23 files changed, 497 insertions(+), 24 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreEventType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreRemoveAppHomeSubClusterEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql index eae882e4a48dd..643857c995dc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql @@ -122,10 +122,11 @@ BEGIN WHERE applicationId = applicationID_IN; END // -CREATE PROCEDURE sp_getApplicationsHomeSubCluster() +CREATE PROCEDURE sp_getApplicationsHomeSubCluster(IN homeSubCluster_IN varchar(256)) BEGIN SELECT applicationId, homeSubCluster - FROM applicationsHomeSubCluster; + FROM applicationsHomeSubCluster + WHERE homeSubCluster_IN='' or homeSubCluster=homeSubCluster_IN; END // CREATE PROCEDURE sp_deleteApplicationHomeSubCluster( diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql index 66d6f0e203558..1ac7bd5761475 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql @@ -111,12 +111,14 @@ IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster] + @homeSubCluster VARCHAR(256) AS BEGIN DECLARE @errorMessage nvarchar(4000) BEGIN TRY SELECT [applicationId], [homeSubCluster], [createTime] FROM [dbo].[applicationsHomeSubCluster] + WHERE @homeSubCluster = '' or [homeSubCluster] = @homeSubCluster END TRY BEGIN CATCH diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 1c4ed24b47d78..ad25439f1dd1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -397,7 +397,8 @@ protected boolean isEventThreadWaiting() { return eventHandlingThread.getState() == Thread.State.WAITING; } - protected boolean isDrained() { + @VisibleForTesting + public boolean isDrained() { return drained; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 2045eb6309200..9e6e776949e27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -96,7 +96,7 @@ public void handle(Event event) { } @Override - protected boolean isDrained() { + public boolean isDrained() { synchronized (mutex) { return drained; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 7c06256a41364..76bc3d974bc1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -242,9 +242,12 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { List result = new ArrayList(); + SubClusterId subClusterId = request.getSubClusterId(); for (Entry e : applications.entrySet()) { - result - .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue())); + if (subClusterId == null || subClusterId.equals(e.getValue())) { + result.add( + ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue())); + } } GetApplicationsHomeSubClusterResponse.newInstance(result); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 2b3fea5609b2e..d313f1c54c939 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -119,7 +119,7 @@ public class SQLFederationStateStore implements FederationStateStore { "{call sp_getApplicationHomeSubCluster(?, ?)}"; private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = - "{call sp_getApplicationsHomeSubCluster()}"; + "{call sp_getApplicationsHomeSubCluster(?)}"; private static final String CALL_SP_SET_POLICY_CONFIGURATION = "{call sp_setPolicyConfiguration(?, ?, ?, ?)}"; @@ -725,6 +725,10 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( try { cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); + // Set the parameters for the stored procedure + SubClusterId subClusterId = request.getSubClusterId(); + cstmt.setString(1, subClusterId == null ? "" : subClusterId.getId()); + // Execute the query long startTime = clock.getTime(); rs = cstmt.executeQuery(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index c9b5849ad68d2..3a2576ace8b61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -220,14 +220,17 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { List result = new ArrayList<>(); + SubClusterId subClusterId = request.getSubClusterId(); try { for (String child : zkManager.getChildren(appsZNode)) { ApplicationId appId = ApplicationId.fromString(child); SubClusterId homeSubCluster = getApp(appId); - ApplicationHomeSubCluster app = - ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); - result.add(app); + if (subClusterId == null || subClusterId.equals(homeSubCluster)) { + ApplicationHomeSubCluster app = + ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); + result.add(app); + } } } catch (Exception e) { String errMsg = "Cannot get apps: " + e.getMessage(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java index 60549722093db..8301400b59351 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.store.records; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.util.Records; @@ -37,4 +38,21 @@ public static GetApplicationsHomeSubClusterRequest newInstance() { return request; } + @Private + @Unstable + public static GetApplicationsHomeSubClusterRequest newInstance( + SubClusterId subClusterId) { + GetApplicationsHomeSubClusterRequest request = + Records.newRecord(GetApplicationsHomeSubClusterRequest.class); + request.setSubClusterId(subClusterId); + return request; + } + + @Public + @Unstable + public abstract SubClusterId getSubClusterId(); + + @Public + @Unstable + public abstract void setSubClusterId(SubClusterId subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java index 05b0b62649f44..ad4f8e77bb443 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java @@ -110,6 +110,9 @@ public String toString() { @Override public ApplicationId getApplicationId() { ApplicationHomeSubClusterProtoOrBuilder p = viaProto ? proto : builder; + if (this.applicationId != null) { + return this.applicationId; + } if (!p.hasApplicationId()) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java index a3c1c1a6bb5a1..b7d835782649c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java @@ -20,9 +20,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProtoOrBuilder; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; /** * Protocol buffer based implementation of @@ -37,6 +40,7 @@ public class GetApplicationsHomeSubClusterRequestPBImpl GetApplicationsHomeSubClusterRequestProto.getDefaultInstance(); private GetApplicationsHomeSubClusterRequestProto.Builder builder = null; private boolean viaProto = false; + private SubClusterId subClusterId = null; public GetApplicationsHomeSubClusterRequestPBImpl() { builder = GetApplicationsHomeSubClusterRequestProto.newBuilder(); @@ -49,11 +53,34 @@ public GetApplicationsHomeSubClusterRequestPBImpl( } public GetApplicationsHomeSubClusterRequestProto getProto() { + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; } + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetApplicationsHomeSubClusterRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.subClusterId != null) { + builder.setSubClusterId(convertToProtoFormat(this.subClusterId)); + } + } + @Override public int hashCode() { return getProto().hashCode(); @@ -75,4 +102,38 @@ public String toString() { return TextFormat.shortDebugString(getProto()); } + @Override + public SubClusterId getSubClusterId() { + GetApplicationsHomeSubClusterRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (subClusterId != null) { + return subClusterId; + } + if (!p.hasSubClusterId()) { + return null; + } + this.subClusterId = convertFromProtoFormat(p.getSubClusterId()); + + return this.subClusterId; + } + + @Override + public void setSubClusterId(SubClusterId subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + this.subClusterId = subClusterId; + } + + private SubClusterId convertFromProtoFormat( + SubClusterIdProto subClusterIdProto) { + return new SubClusterIdPBImpl(subClusterIdProto); + } + + private SubClusterIdProto convertToProtoFormat(SubClusterId appId) { + return ((SubClusterIdPBImpl) appId).getProto(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto index 114a60df87b6a..6722500fa98ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto @@ -122,7 +122,7 @@ message GetApplicationHomeSubClusterResponseProto { } message GetApplicationsHomeSubClusterRequestProto { - + optional SubClusterIdProto sub_cluster_id = 1; } message GetApplicationsHomeSubClusterResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index d0e6485b02883..f74d068b4a0de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -396,6 +396,18 @@ public void testGetApplicationsHomeSubCluster() throws Exception { Assert.assertEquals(2, result.getAppsHomeSubClusters().size()); Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1)); Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2)); + + getRequest = GetApplicationsHomeSubClusterRequest + .newInstance(subClusterId1); + result = stateStore.getApplicationsHomeSubCluster(getRequest); + Assert.assertEquals(1, result.getAppsHomeSubClusters().size()); + Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1)); + + getRequest = GetApplicationsHomeSubClusterRequest + .newInstance(subClusterId2); + result = stateStore.getApplicationsHomeSubCluster(getRequest); + Assert.assertEquals(1, result.getAppsHomeSubClusters().size()); + Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2)); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java index c3d0a9e1bbd53..7d4c2d1aa54e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -164,11 +164,14 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " WHERE applicationId = applicationID_IN; END"; private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER = - "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()" + "CREATE PROCEDURE sp_getApplicationsHomeSubCluster(" + + " IN homeSubCluster_IN varchar(256))" + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + " DECLARE result CURSOR FOR" + " SELECT applicationId, homeSubCluster" - + " FROM applicationsHomeSubCluster; OPEN result; END"; + + " FROM applicationsHomeSubCluster" + + " WHERE homeSubCluster_IN='' or homeSubCluster=homeSubCluster_IN;" + + " OPEN result; END"; private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER = "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index f1b0c794031e5..c0a783b8a3b16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; @@ -90,6 +91,7 @@ public class RMActiveServiceContext { private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; private RMStateStore stateStore = null; + private FederationStateStoreService fedStateStoreService = null; private ContainerAllocationExpirer containerAllocationExpirer; private DelegationTokenRenewer delegationTokenRenewer; private AMRMTokenSecretManager amRMTokenSecretManager; @@ -170,6 +172,13 @@ public void setStateStore(RMStateStore store) { stateStore = store; } + @Private + @Unstable + public void setFederationStateStoreService( + FederationStateStoreService federationStateStoreService) { + fedStateStoreService = federationStateStoreService; + } + @Private @Unstable public ClientRMService getClientRMService() { @@ -194,6 +203,12 @@ public RMStateStore getStateStore() { return stateStore; } + @Private + @Unstable + public FederationStateStoreService getFederationStateStoreService() { + return fedStateStoreService; + } + @Private @Unstable public ConcurrentMap getRMApps() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index f847152c47d76..b4e4574bbfcee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -28,9 +28,11 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer; import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.PrivilegedEntity; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreRemoveAppHomeSubClusterEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -359,6 +361,11 @@ protected synchronized void checkAppNumCompletedLimit() { + " from memory: "); rmContext.getRMApps().remove(removeId); this.applicationACLsManager.removeApplication(removeId); + // Remove application from federation state store + if (HAUtil.isFederationEnabled(conf)) { + rmContext.getFederationStateStoreService().getEventHandler().handle( + new FederationStateStoreRemoveAppHomeSubClusterEvent(removeId)); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 55420bd92707e..4b34fe1b9af97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -203,4 +204,6 @@ void setMultiNodeSortingManager( long getTokenSequenceNo(); void incrTokenSequenceNo(); + + FederationStateStoreService getFederationStateStoreService(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ede7561b847f6..5c55b8ab60d5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -678,4 +679,9 @@ public long getTokenSequenceNo() { public void incrTokenSequenceNo() { this.activeServiceContext.incrTokenSequenceNo(); } + + @Override + public FederationStateStoreService getFederationStateStoreService() { + return activeServiceContext.getFederationStateStoreService(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8adcff42a695d..e0b79c1f0d497 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -23,6 +23,8 @@ import com.sun.jersey.spi.container.servlet.ServletContainer; import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreEvent; +import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreEventType; import org.apache.hadoop.yarn.webapp.WebAppException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -917,6 +919,8 @@ protected void serviceInit(Configuration configuration) throws Exception { } federationStateStoreService = createFederationStateStoreService(); addIfService(federationStateStoreService); + rmContext.getActiveServiceContext() + .setFederationStateStoreService(federationStateStoreService); LOG.info("Initialized Federation membership."); } @@ -1010,6 +1014,11 @@ protected void serviceStart() throws Exception { LOG.info("Epoch set for Federation: " + epoch); } } + if (HAUtil.isFederationEnabled(conf)) { + federationStateStoreService.getEventHandler().handle( + new FederationStateStoreEvent( + FederationStateStoreEventType.CHECK_APPS_HOME_SUBCLUSTER)); + } super.serviceStart(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreEvent.java new file mode 100644 index 0000000000000..d0cd2cfa48f3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreEvent.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class FederationStateStoreEvent extends + AbstractEvent { + + public FederationStateStoreEvent(FederationStateStoreEventType type) { + super(type); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreEventType.java new file mode 100644 index 0000000000000..722538ab6caa8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreEventType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +public enum FederationStateStoreEventType { + CHECK_APPS_HOME_SUBCLUSTER, + REMOVE_APP_HOME_SUBCLUSTER, +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreRemoveAppHomeSubClusterEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreRemoveAppHomeSubClusterEvent.java new file mode 100644 index 0000000000000..dfa2bc9c4835c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreRemoveAppHomeSubClusterEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +public class FederationStateStoreRemoveAppHomeSubClusterEvent extends + FederationStateStoreEvent { + + private ApplicationId applicationId; + + public FederationStateStoreRemoveAppHomeSubClusterEvent( + ApplicationId applicationId) { + super(FederationStateStoreEventType.REMOVE_APP_HOME_SUBCLUSTER); + this.applicationId = applicationId; + } + + public ApplicationId getApplicationId() { + return applicationId; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index c4dae7d4f7d3f..7506dc2e980af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -22,17 +22,24 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; @@ -64,6 +71,10 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,10 +99,46 @@ public class FederationStateStoreService extends AbstractService private long heartbeatInterval; private RMContext rmContext; + private AsyncDispatcher dispatcher; + private final ReadLock readLock; + private final WriteLock writeLock; + + private enum FederationStateStoreState { + ACTIVE, + }; + + private static final StateMachineFactory stateMachineFactory = + new StateMachineFactory(FederationStateStoreState.ACTIVE) + .addTransition(FederationStateStoreState.ACTIVE, + FederationStateStoreState.ACTIVE, + FederationStateStoreEventType.REMOVE_APP_HOME_SUBCLUSTER, + new RemoveAppHomeSubClusterTransition()) + .addTransition(FederationStateStoreState.ACTIVE, + FederationStateStoreState.ACTIVE, + FederationStateStoreEventType.CHECK_APPS_HOME_SUBCLUSTER, + new CheckAppsHomeSubClusterTransition()); + + private final StateMachine stateMachine; + + private EventHandler fedStateStoreEventHandler; + public FederationStateStoreService(RMContext rmContext) { super(FederationStateStoreService.class.getName()); LOG.info("FederationStateStoreService initialized"); this.rmContext = rmContext; + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); + stateMachine = stateMachineFactory.make(this); } @Override @@ -120,6 +167,15 @@ protected void serviceInit(Configuration conf) throws Exception { heartbeatInterval = YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS; } + + // Create async handler + dispatcher = new AsyncDispatcher("Federation StateStore dispatcher"); + dispatcher.init(conf); + fedStateStoreEventHandler = new FederationStateStoreEventHandler(); + dispatcher.register(FederationStateStoreEventType.class, + fedStateStoreEventHandler); + dispatcher.setDrainEventsOnStop(); + LOG.info("Initialized federation membership service."); super.serviceInit(conf); @@ -127,9 +183,8 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { - registerAndInitializeHeartbeat(); - + dispatcher.start(); super.serviceStart(); } @@ -301,4 +356,116 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( DeleteApplicationHomeSubClusterRequest request) throws YarnException { return stateStoreClient.deleteApplicationHomeSubCluster(request); } + + private final class FederationStateStoreEventHandler + implements EventHandler { + + @Override + public void handle(FederationStateStoreEvent event) { + handleEvent(event); + } + } + + private void handleEvent(FederationStateStoreEvent event) { + this.writeLock.lock(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing event of type {}", event.getType()); + } + final FederationStateStoreState oldState = getFederationStateStoreState(); + this.stateMachine.doTransition(event.getType(), event); + if (oldState != getFederationStateStoreState()) { + LOG.info("FederatioinStateStore state change from {} to {}", oldState, + getFederationStateStoreState()); + } + } catch (InvalidStateTransitionException e) { + LOG.error("Can't handle this event at current state", e); + } finally { + this.writeLock.unlock(); + } + } + + public FederationStateStoreState getFederationStateStoreState() { + this.readLock.lock(); + try { + return this.stateMachine.getCurrentState(); + } finally { + this.readLock.unlock(); + } + } + + private static class RemoveAppHomeSubClusterTransition implements + SingleArcTransition { + + @Override + public void transition( + FederationStateStoreService federationStateStoreService, + FederationStateStoreEvent event) { + if (!(event instanceof FederationStateStoreRemoveAppHomeSubClusterEvent)) { + // should never happen + LOG.error("Illegal event type: {}", event.getClass()); + return; + } + ApplicationId id = ((FederationStateStoreRemoveAppHomeSubClusterEvent) event) + .getApplicationId(); + try { + federationStateStoreService.deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest.newInstance(id)); + if (LOG.isDebugEnabled()) { + LOG.debug("Application {} has removed from federation state store", + id); + } + } catch (Throwable t) { + // You should remove app manually when throw exception + LOG.error("Delete application {} home subcluster failed, caused by {}.", + id, t.getMessage()); + } + } + } + + private static class CheckAppsHomeSubClusterTransition implements + SingleArcTransition { + + @Override + public void transition( + FederationStateStoreService store, + FederationStateStoreEvent event) { + if (event.getType() != FederationStateStoreEventType.CHECK_APPS_HOME_SUBCLUSTER) { + // should never happen + LOG.error("Illegal event type: {} ", event.getClass()); + return; + } + try { + GetApplicationsHomeSubClusterResponse response = store + .getApplicationsHomeSubCluster(GetApplicationsHomeSubClusterRequest + .newInstance(store.subClusterId)); + for (ApplicationHomeSubCluster app : response + .getAppsHomeSubClusters()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Check application {}.", app.getApplicationId()); + } + if (!store.rmContext.getRMApps() + .containsKey(app.getApplicationId())) { + store.getEventHandler().handle( + new FederationStateStoreRemoveAppHomeSubClusterEvent( + app.getApplicationId())); + if (LOG.isDebugEnabled()) { + LOG.debug("Check application {} which will be removed.", app); + } + } + } + } catch (Throwable t) { + /// You should remove app manually when throw exception + LOG.error("Fetch applications home subcluster failed.", t); + } + } + } + + public EventHandler getEventHandler() { + return dispatcher.getEventHandler(); + } + + public AsyncDispatcher getDispatcher() { + return dispatcher; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index e5e156dcf76c6..4774553ebe099 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -17,17 +17,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.federation; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.io.StringReader; import java.net.UnknownHostException; +import java.util.List; import javax.xml.bind.JAXBException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; @@ -35,6 +45,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.junit.After; import org.junit.Assert; @@ -53,15 +64,18 @@ public class TestFederationRMStateStoreService { private final HAServiceProtocol.StateChangeRequestInfo requestInfo = new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_USER); - private final SubClusterId subClusterId = SubClusterId.newInstance("SC-1"); + private final SubClusterId subClusterId1 = SubClusterId.newInstance("SC-1"); + private final SubClusterId subClusterId2 = SubClusterId.newInstance("SC-2"); private final GetSubClusterInfoRequest request = - GetSubClusterInfoRequest.newInstance(subClusterId); + GetSubClusterInfoRequest.newInstance(subClusterId1); private Configuration conf; + private FederationStateStoreService service; private FederationStateStore stateStore; private long lastHearbeatTS = 0; private JSONJAXBContext jc; private JSONUnmarshaller unmarshaller; + private MockRM rm; @Before public void setUp() throws IOException, YarnException, JAXBException { @@ -70,20 +84,24 @@ public void setUp() throws IOException, YarnException, JAXBException { JSONConfiguration.mapped().rootUnwrapping(false).build(), ClusterMetricsInfo.class); unmarshaller = jc.createJSONUnmarshaller(); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId1.getId()); + //conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + rm = new MockRM(conf); } @After public void tearDown() throws Exception { unmarshaller = null; jc = null; + if (rm != null) { + rm.stop(); + rm = null; + } } @Test public void testFederationStateStoreService() throws Exception { - conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); - conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); - final MockRM rm = new MockRM(conf); - // Initially there should be no entry for the sub-cluster rm.init(conf); stateStore = rm.getFederationStateStoreService().getStateStoreClient(); @@ -111,7 +129,7 @@ public void testFederationStateStoreService() throws Exception { // Validate sub-cluster deregistration rm.getFederationStateStoreService() .deregisterSubCluster(SubClusterDeregisterRequest - .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED)); + .newInstance(subClusterId1, SubClusterState.SC_UNREGISTERED)); checkSubClusterInfo(SubClusterState.SC_UNREGISTERED); // check after failover @@ -132,8 +150,6 @@ public void testFederationStateStoreService() throws Exception { storeHeartbeat.run(); capability = checkSubClusterInfo(SubClusterState.SC_RUNNING); checkClusterMetricsInfo(capability, 1); - - rm.stop(); } private void explicitFailover(MockRM rm) throws IOException { @@ -173,4 +189,56 @@ private String checkSubClusterInfo(SubClusterState state) return response.getCapability(); } + @Test(timeout=30000) + public void testDeleteApplicationHomeSubCluster() throws Exception { + rm.init(conf); + service = rm.getFederationStateStoreService(); + stateStore = service.getStateStoreClient(); + + ApplicationId appId1SubCluster1 = ApplicationId.newInstance(1l, 1); + ApplicationId appId2SubCluster1 = ApplicationId.newInstance(1l, 2); + ApplicationId appId1SubCluster2 = ApplicationId.newInstance(2l, 3); + + ApplicationHomeSubCluster app1SubCluster1 = ApplicationHomeSubCluster + .newInstance(appId1SubCluster1, subClusterId1); + stateStore.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest + .newInstance(app1SubCluster1)); + ApplicationHomeSubCluster app2SubCluster1 = ApplicationHomeSubCluster + .newInstance(appId2SubCluster1, subClusterId1); + stateStore.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest + .newInstance(app2SubCluster1)); + ApplicationHomeSubCluster app1SubCluster2 = ApplicationHomeSubCluster + .newInstance(appId1SubCluster2, subClusterId2); + stateStore.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest + .newInstance(app1SubCluster2)); + + RMAppImpl rmApp1SubCluster1 = mock(RMAppImpl.class); + when(rmApp1SubCluster1.getApplicationId()).thenReturn(appId1SubCluster1); + rm.getRMContext().getRMApps().putIfAbsent(appId1SubCluster1, rmApp1SubCluster1); + + GetApplicationsHomeSubClusterResponse response = stateStore + .getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest.newInstance()); + List appsHomeSubClusters = response + .getAppsHomeSubClusters(); + Assert.assertEquals(3, appsHomeSubClusters.size()); + Assert.assertTrue(appsHomeSubClusters.contains(app1SubCluster1)); + Assert.assertTrue(appsHomeSubClusters.contains(app2SubCluster1)); + Assert.assertTrue(appsHomeSubClusters.contains(app1SubCluster2)); + + rm.start(); + + // wait drain all event + GenericTestUtils.waitFor( + () -> rm.getFederationStateStoreService().getDispatcher().isDrained(), + 1000, 20000); + + response = stateStore.getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest.newInstance()); + appsHomeSubClusters = response.getAppsHomeSubClusters(); + Assert.assertEquals(2, appsHomeSubClusters.size()); + Assert.assertTrue(appsHomeSubClusters.contains(app1SubCluster1)); + Assert.assertTrue(appsHomeSubClusters.contains(app1SubCluster2)); + } + } From 44bf46f1c96db4e1cdfc80f93782c258845bdd18 Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Wed, 22 Jun 2022 13:00:33 +0800 Subject: [PATCH 2/4] fix compile error and warning --- .../federation/FederationStateStoreService.java | 5 +++-- .../federation/TestFederationRMStateStoreService.java | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 7506dc2e980af..3bb1e3479ba74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -107,7 +108,7 @@ private enum FederationStateStoreState { ACTIVE, }; - private static final StateMachineFactory stateMachineFactory = @@ -461,7 +462,7 @@ public void transition( } } - public EventHandler getEventHandler() { + public EventHandler getEventHandler() { return dispatcher.getEventHandler(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index 4774553ebe099..8c83b1fa628d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -195,9 +195,9 @@ public void testDeleteApplicationHomeSubCluster() throws Exception { service = rm.getFederationStateStoreService(); stateStore = service.getStateStoreClient(); - ApplicationId appId1SubCluster1 = ApplicationId.newInstance(1l, 1); - ApplicationId appId2SubCluster1 = ApplicationId.newInstance(1l, 2); - ApplicationId appId1SubCluster2 = ApplicationId.newInstance(2l, 3); + ApplicationId appId1SubCluster1 = ApplicationId.newInstance(1L, 1); + ApplicationId appId2SubCluster1 = ApplicationId.newInstance(1L, 2); + ApplicationId appId1SubCluster2 = ApplicationId.newInstance(2L, 3); ApplicationHomeSubCluster app1SubCluster1 = ApplicationHomeSubCluster .newInstance(appId1SubCluster1, subClusterId1); From 9b0166e3a3f55e6b66179ca0d64e688229509631 Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Thu, 23 Jun 2022 15:59:25 +0800 Subject: [PATCH 3/4] trigger ci From f75fb108b91685f308ccd568a6c36d0dbc44a1a5 Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Mon, 27 Jun 2022 10:59:02 +0800 Subject: [PATCH 4/4] fix ut --- .../router/clientrm/TestFederationClientInterceptor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 3037738240266..fbe3d189f28fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; @@ -117,6 +118,10 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { private static final Logger LOG = LoggerFactory.getLogger(TestFederationClientInterceptor.class); + static { + DefaultMetricsSystem.setMiniClusterMode(true); + } + private TestableFederationClientInterceptor interceptor; private MemoryFederationStateStore stateStore; private FederationStateStoreTestUtil stateStoreUtil;