diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql index 9434ed3848805..6461cf2bd75e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql @@ -122,10 +122,21 @@ BEGIN WHERE applicationId = applicationID_IN; END // -CREATE PROCEDURE sp_getApplicationsHomeSubCluster() -BEGIN - SELECT applicationId, homeSubCluster - FROM applicationsHomeSubCluster; +CREATE PROCEDURE sp_getApplicationsHomeSubCluster(IN limit_IN int, IN homeSubCluster_IN varchar(256)) +BEGIN + SELECT + applicationId, + homeSubCluster, + createTime + FROM (SELECT + applicationId, + homeSubCluster, + createTime, + @rownum := 0 + FROM applicationshomesubcluster + ORDER BY createTime DESC) AS applicationshomesubcluster + WHERE (homeSubCluster_IN = '' OR homeSubCluster = homeSubCluster_IN) + AND (@rownum := @rownum + 1) <= limit_IN; END // CREATE PROCEDURE sp_deleteApplicationHomeSubCluster( diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql index 61f47d4bba6ae..d61a10f998b3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql @@ -22,7 +22,8 @@ USE FederationStateStore CREATE TABLE applicationsHomeSubCluster( applicationId varchar(64) NOT NULL, - homeSubCluster varchar(256) NULL, + homeSubCluster varchar(256) NOT NULL, + createTime datetime NOT NULL, CONSTRAINT pk_applicationId PRIMARY KEY (applicationId) ); diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql index ab17aae4f88d9..17f9e96909c52 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql +++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql @@ -111,12 +111,26 @@ IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL GO CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster] + @limit int, + @homeSubCluster VARCHAR(256) AS BEGIN DECLARE @errorMessage nvarchar(4000) BEGIN TRY - SELECT [applicationId], [homeSubCluster], [createTime] - FROM [dbo].[applicationsHomeSubCluster] + + SELECT + [applicationId], + [homeSubCluster], + [createTime] + FROM(SELECT + [applicationId], + [homeSubCluster], + [createTime], + row_number() over(order by [createTime] desc) AS app_rank + FROM [dbo].[applicationsHomeSubCluster] + WHERE [homeSubCluster] = @homeSubCluster OR @homeSubCluster = '') AS applicationsHomeSubCluster + WHERE app_rank <= @limit; + END TRY BEGIN CATCH diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fd4cb0c5ec9d2..d5e120695e739 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4056,6 +4056,11 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1; + public static final String FEDERATION_STATESTORE_MAX_APPLICATIONS = + FEDERATION_PREFIX + "state-store.max-applications"; + + public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000; + public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 92a7298373879..5132d4199afdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5007,4 +5007,13 @@ + + yarn.federation.state-store.max-applications + 1000 + + Yarn federation state-store supports querying the maximum number of apps. + Default is 1000. + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index b4c99bab9d5d1..4d545fb808068 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -27,12 +27,16 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.Comparator; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; @@ -90,6 +94,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster; + /** * In-memory implementation of {@link FederationStateStore}. */ @@ -100,6 +106,7 @@ public class MemoryFederationStateStore implements FederationStateStore { private Map reservations; private Map policies; private RouterRMDTSecretManagerState routerRMSecretManagerState; + private int maxAppsInStateStore; private final MonotonicClock clock = new MonotonicClock(); @@ -113,6 +120,9 @@ public void init(Configuration conf) { reservations = new ConcurrentHashMap(); policies = new ConcurrentHashMap(); routerRMSecretManagerState = new RouterRMDTSecretManagerState(); + maxAppsInStateStore = conf.getInt( + YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); } @Override @@ -266,17 +276,28 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( @Override public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { - List result = - new ArrayList(); - for (Entry e : applications.entrySet()) { - result - .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue())); + + if (request == null) { + throw new YarnException("Missing getApplicationsHomeSubCluster request"); } - GetApplicationsHomeSubClusterResponse.newInstance(result); + SubClusterId requestSC = request.getSubClusterId(); + List result = applications.keySet().stream() + .map(applicationId -> generateAppHomeSC(applicationId)) + .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed()) + .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster())) + .limit(maxAppsInStateStore) + .collect(Collectors.toList()); + + LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size()); return GetApplicationsHomeSubClusterResponse.newInstance(result); } + private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) { + SubClusterId subClusterId = applications.get(applicationId); + return ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), subClusterId); + } + @Override public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( DeleteApplicationHomeSubClusterRequest request) throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 22a10f5f669b9..889c1e06413f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -136,7 +136,7 @@ public class SQLFederationStateStore implements FederationStateStore { "{call sp_getApplicationHomeSubCluster(?, ?)}"; private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = - "{call sp_getApplicationsHomeSubCluster()}"; + "{call sp_getApplicationsHomeSubCluster(?, ?)}"; private static final String CALL_SP_SET_POLICY_CONFIGURATION = "{call sp_setPolicyConfiguration(?, ?, ?, ?)}"; @@ -176,6 +176,7 @@ public class SQLFederationStateStore implements FederationStateStore { private final Clock clock = new MonotonicClock(); @VisibleForTesting Connection conn = null; + private int maxAppsInStateStore; @Override public void init(Configuration conf) throws YarnException { @@ -215,6 +216,10 @@ public void init(Configuration conf) throws YarnException { FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Not able to get Connection", e); } + + maxAppsInStateStore = conf.getInt( + YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); } @Override @@ -748,24 +753,35 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( @Override public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { + + if (request == null) { + throw new YarnException("Missing getApplicationsHomeSubCluster request"); + } + CallableStatement cstmt = null; ResultSet rs = null; - List appsHomeSubClusters = - new ArrayList(); + List appsHomeSubClusters = new ArrayList<>(); try { cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); + cstmt.setInt("limit_IN", maxAppsInStateStore); + String homeSubClusterIN = StringUtils.EMPTY; + SubClusterId subClusterId = request.getSubClusterId(); + if (subClusterId != null) { + homeSubClusterIN = subClusterId.toString(); + } + cstmt.setString("homeSubCluster_IN", homeSubClusterIN); // Execute the query long startTime = clock.getTime(); rs = cstmt.executeQuery(); long stopTime = clock.getTime(); - while (rs.next()) { + while (rs.next() && appsHomeSubClusters.size() <= maxAppsInStateStore) { // Extract the output for each tuple - String applicationId = rs.getString(1); - String homeSubCluster = rs.getString(2); + String applicationId = rs.getString("applicationId"); + String homeSubCluster = rs.getString("homeSubCluster"); appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance( ApplicationId.fromString(applicationId), @@ -783,8 +799,8 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( // Return to the pool the CallableStatement FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs); } - return GetApplicationsHomeSubClusterResponse - .newInstance(appsHomeSubClusters); + + return GetApplicationsHomeSubClusterResponse.newInstance(appsHomeSubClusters); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 18dfdc27d86fa..affd4ced4f174 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -24,10 +24,13 @@ import java.util.Calendar; import java.util.List; import java.util.TimeZone; +import java.util.Comparator; +import java.util.stream.Collectors; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Time; import org.apache.hadoop.util.curator.ZKCuratorManager; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -98,6 +101,8 @@ import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; +import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster; + /** * ZooKeeper implementation of {@link FederationStateStore}. * @@ -136,6 +141,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private String membershipZNode; private String policiesZNode; private String reservationsZNode; + private int maxAppsInStateStore; private volatile Clock clock = SystemClock.getInstance(); @@ -147,6 +153,10 @@ public class ZookeeperFederationStateStore implements FederationStateStore { public void init(Configuration conf) throws YarnException { LOG.info("Initializing ZooKeeper connection"); + maxAppsInStateStore = conf.getInt( + YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS); + baseZNode = conf.get( YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH, YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH); @@ -258,24 +268,44 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( @Override public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { - long start = clock.getTime(); - List result = new ArrayList<>(); + + if (request == null) { + throw new YarnException("Missing getApplicationsHomeSubCluster request"); + } try { - for (String child : zkManager.getChildren(appsZNode)) { - ApplicationId appId = ApplicationId.fromString(child); - SubClusterId homeSubCluster = getApp(appId); - ApplicationHomeSubCluster app = - ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); - result.add(app); - } + long start = clock.getTime(); + SubClusterId requestSC = request.getSubClusterId(); + List children = zkManager.getChildren(appsZNode); + List result = children.stream() + .map(child -> generateAppHomeSC(child)) + .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed()) + .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster())) + .limit(maxAppsInStateStore) + .collect(Collectors.toList()); + long end = clock.getTime(); + opDurations.addGetAppsHomeSubClusterDuration(start, end); + LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size()); + return GetApplicationsHomeSubClusterResponse.newInstance(result); } catch (Exception e) { String errMsg = "Cannot get apps: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - long end = clock.getTime(); - opDurations.addGetAppsHomeSubClusterDuration(start, end); - return GetApplicationsHomeSubClusterResponse.newInstance(result); + + throw new YarnException("Cannot get app by request"); + } + + private ApplicationHomeSubCluster generateAppHomeSC(String appId) { + try { + ApplicationId applicationId = ApplicationId.fromString(appId); + SubClusterId homeSubCluster = getApp(applicationId); + ApplicationHomeSubCluster app = + ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), homeSubCluster); + return app; + } catch (Exception ex) { + LOG.error("get homeSubCluster by appId = {}.", appId); + } + return null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java index 5e4c7ccf4ef9e..898e11f182015 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java @@ -51,6 +51,17 @@ public static ApplicationHomeSubCluster newInstance(ApplicationId appId, return appMapping; } + @Private + @Unstable + public static ApplicationHomeSubCluster newInstance(ApplicationId appId, long createTime, + SubClusterId homeSubCluster) { + ApplicationHomeSubCluster appMapping = Records.newRecord(ApplicationHomeSubCluster.class); + appMapping.setApplicationId(appId); + appMapping.setHomeSubCluster(homeSubCluster); + appMapping.setCreateTime(createTime); + return appMapping; + } + /** * Get the {@link ApplicationId} representing the unique identifier of the * application. @@ -91,6 +102,25 @@ public static ApplicationHomeSubCluster newInstance(ApplicationId appId, @Unstable public abstract void setHomeSubCluster(SubClusterId homeSubCluster); + /** + * Get the create time of the subcluster. + * + * @return the state of the subcluster + */ + @Public + @Unstable + public abstract long getCreateTime(); + + /** + * Set the create time of the subcluster. + * + * @param time the last heartbeat time of the subcluster + */ + @Private + @Unstable + public abstract void setCreateTime(long time); + + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java index 60549722093db..06b6987dcbafb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.store.records; +import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.util.Records; @@ -37,4 +38,33 @@ public static GetApplicationsHomeSubClusterRequest newInstance() { return request; } + @Private + @Unstable + public static GetApplicationsHomeSubClusterRequest + newInstance(SubClusterId subClusterId) { + GetApplicationsHomeSubClusterRequest request = + Records.newRecord(GetApplicationsHomeSubClusterRequest.class); + request.setSubClusterId(subClusterId); + return request; + } + + /** + * Get the {@link SubClusterId} representing the unique identifier of the + * subcluster. + * + * @return the subcluster identifier + */ + @Public + @Unstable + public abstract SubClusterId getSubClusterId(); + + /** + * Set the {@link SubClusterId} representing the unique identifier of the + * subcluster. + * + * @param subClusterId the subcluster identifier + */ + @Public + @Unstable + public abstract void setSubClusterId(SubClusterId subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java index 05b0b62649f44..a72a431430d5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java @@ -149,6 +149,16 @@ public void setHomeSubCluster(SubClusterId homeSubCluster) { this.homeSubCluster = homeSubCluster; } + @Override + public long getCreateTime() { + return 0; + } + + @Override + public void setCreateTime(long time) { + + } + private SubClusterId convertFromProtoFormat(SubClusterIdProto subClusterId) { return new SubClusterIdPBImpl(subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java index a3c1c1a6bb5a1..1a75044cff3b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java @@ -19,10 +19,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos; import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProto; +import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; /** * Protocol buffer based implementation of @@ -75,4 +78,37 @@ public String toString() { return TextFormat.shortDebugString(getProto()); } + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetApplicationsHomeSubClusterRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public SubClusterId getSubClusterId() { + GetApplicationsHomeSubClusterRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasSubClusterId()) { + return null; + } + return convertFromProtoFormat(p.getSubClusterId()); + } + + @Override + public void setSubClusterId(SubClusterId subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(convertToProtoFormat(subClusterId)); + } + + private SubClusterId convertFromProtoFormat(YarnServerFederationProtos.SubClusterIdProto sc) { + return new SubClusterIdPBImpl(sc); + } + + private YarnServerFederationProtos.SubClusterIdProto convertToProtoFormat(SubClusterId sc) { + return ((SubClusterIdPBImpl) sc).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java index 7dc53f8e0acfc..52ef725fb2b94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,4 +280,30 @@ public static void setPassword(HikariDataSource dataSource, String password) { LOG.debug("NULL Credentials specified for Store connection, so ignoring"); } } + + /** + * Filter HomeSubCluster based on Filter SubCluster. + * + * @param filterSubCluster filter query conditions + * @param homeSubCluster homeSubCluster + * @return return true, if match filter conditions, + * return false, if not match filter conditions. + */ + public static boolean filterHomeSubCluster(SubClusterId filterSubCluster, + SubClusterId homeSubCluster) { + + // If the filter condition is empty, + // it means that homeSubCluster needs to be added + if (filterSubCluster == null) { + return true; + } + + // If the filter condition filterSubCluster is not empty, + // and filterSubCluster is equal to homeSubCluster, it needs to be added + if (filterSubCluster.equals(homeSubCluster)) { + return true; + } + + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto index ff2b97091bf43..0544a26e4c5a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto @@ -97,6 +97,7 @@ message GetSubClustersInfoResponseProto { message ApplicationHomeSubClusterProto { optional ApplicationIdProto application_id = 1; optional SubClusterIdProto home_sub_cluster = 2; + optional int64 create_time = 3; } message AddApplicationHomeSubClusterRequestProto { @@ -123,7 +124,7 @@ message GetApplicationHomeSubClusterResponseProto { } message GetApplicationsHomeSubClusterRequestProto { - + optional SubClusterIdProto sub_cluster_id = 1; } message GetApplicationsHomeSubClusterResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 258762682dfb8..d5493f6614f89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -87,6 +87,8 @@ public abstract class FederationStateStoreBaseTest { private static final MonotonicClock CLOCK = new MonotonicClock(); private FederationStateStore stateStore; + private static final int NUM_APPS_10 = 10; + private static final int NUM_APPS_20 = 20; protected abstract FederationStateStore createStateStore(); @@ -416,6 +418,89 @@ public void testGetApplicationsHomeSubCluster() throws Exception { Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2)); } + @Test + public void testGetApplicationsHomeSubClusterEmpty() throws Exception { + LambdaTestUtils.intercept(YarnException.class, + "Missing getApplicationsHomeSubCluster request", + () -> stateStore.getApplicationsHomeSubCluster(null)); + } + + @Test + public void testGetApplicationsHomeSubClusterFilter() throws Exception { + // Add ApplicationHomeSC - SC1 + long now = Time.now(); + + Set appHomeSubClusters = new HashSet<>(); + + for (int i = 0; i < NUM_APPS_10; i++) { + ApplicationId appId = ApplicationId.newInstance(now, i); + SubClusterId subClusterId = SubClusterId.newInstance("SC1"); + addApplicationHomeSC(appId, subClusterId); + ApplicationHomeSubCluster ahsc = + ApplicationHomeSubCluster.newInstance(appId, subClusterId); + appHomeSubClusters.add(ahsc); + } + + // Add ApplicationHomeSC - SC2 + for (int i = 10; i < NUM_APPS_20; i++) { + ApplicationId appId = ApplicationId.newInstance(now, i); + SubClusterId subClusterId = SubClusterId.newInstance("SC2"); + addApplicationHomeSC(appId, subClusterId); + } + + GetApplicationsHomeSubClusterRequest getRequest = + GetApplicationsHomeSubClusterRequest.newInstance(); + getRequest.setSubClusterId(SubClusterId.newInstance("SC1")); + + GetApplicationsHomeSubClusterResponse result = + stateStore.getApplicationsHomeSubCluster(getRequest); + Assert.assertNotNull(result); + + List items = result.getAppsHomeSubClusters(); + Assert.assertNotNull(items); + Assert.assertEquals(10, items.size()); + + for (ApplicationHomeSubCluster item : items) { + Assert.assertTrue(appHomeSubClusters.contains(item)); + } + } + + @Test + public void testGetApplicationsHomeSubClusterLimit() throws Exception { + // Add ApplicationHomeSC - SC1 + long now = Time.now(); + + for (int i = 0; i < 50; i++) { + ApplicationId appId = ApplicationId.newInstance(now, i); + SubClusterId subClusterId = SubClusterId.newInstance("SC1"); + addApplicationHomeSC(appId, subClusterId); + } + + GetApplicationsHomeSubClusterRequest getRequest = + GetApplicationsHomeSubClusterRequest.newInstance(); + getRequest.setSubClusterId(SubClusterId.newInstance("SC1")); + GetApplicationsHomeSubClusterResponse result = + stateStore.getApplicationsHomeSubCluster(getRequest); + Assert.assertNotNull(result); + + // Write 50 records, but get 10 records because the maximum number is limited to 10 + List items = result.getAppsHomeSubClusters(); + Assert.assertNotNull(items); + Assert.assertEquals(10, items.size()); + + GetApplicationsHomeSubClusterRequest getRequest1 = + GetApplicationsHomeSubClusterRequest.newInstance(); + getRequest1.setSubClusterId(SubClusterId.newInstance("SC2")); + GetApplicationsHomeSubClusterResponse result1 = + stateStore.getApplicationsHomeSubCluster(getRequest1); + Assert.assertNotNull(result1); + + // SC2 data does not exist, so the number of returned records is 0 + List items1 = result1.getAppsHomeSubClusters(); + Assert.assertNotNull(items1); + Assert.assertEquals(0, items1.size()); + } + @Test public void testUpdateApplicationHomeSubCluster() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java index f11a2599202b5..e90f1dc099ea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -31,6 +31,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.slf4j.Logger; @@ -50,6 +51,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { " CREATE TABLE applicationsHomeSubCluster (" + " applicationId varchar(64) NOT NULL," + " homeSubCluster varchar(256) NOT NULL," + + " createTime datetime NOT NULL," + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))"; private static final String TABLE_MEMBERSHIP = @@ -149,8 +151,9 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)" + " MODIFIES SQL DATA BEGIN ATOMIC" + " INSERT INTO applicationsHomeSubCluster " - + " (applicationId,homeSubCluster) " - + " (SELECT applicationId_IN, homeSubCluster_IN" + + " (applicationId,homeSubCluster,createTime) " + + " (SELECT applicationId_IN, homeSubCluster_IN, " + + " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE" + " FROM applicationsHomeSubCluster" + " WHERE applicationId = applicationId_IN" + " HAVING COUNT(*) = 0 );" @@ -179,11 +182,16 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { + " WHERE applicationId = applicationID_IN; END"; private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER = - "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()" + "CREATE PROCEDURE sp_getApplicationsHomeSubCluster(" + + "IN limit_IN int, IN homeSubCluster_IN varchar(256))" + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + " DECLARE result CURSOR FOR" - + " SELECT applicationId, homeSubCluster" - + " FROM applicationsHomeSubCluster; OPEN result; END"; + + " SELECT applicationId, homeSubCluster, createTime" + + " FROM applicationsHomeSubCluster " + + " WHERE ROWNUM() <= limit_IN AND " + + " (homeSubCluster_IN = '' OR homeSubCluster = homeSubCluster_IN) " + + " ORDER BY createTime desc; " + + " OPEN result; END"; private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER = "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(" @@ -315,6 +323,7 @@ public class HSQLDBFederationStateStore extends SQLFederationStateStore { @Override public void init(Configuration conf) { try { + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10); super.init(conf); conn = super.conn; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index c29fc03c5b698..70dda2227d0d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.store.impl; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; /** @@ -29,6 +30,7 @@ public class TestMemoryFederationStateStore @Override protected FederationStateStore createStateStore() { Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10); super.setConf(conf); return new MemoryFederationStateStore(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java index d0dec2603dd9e..6f5f19877c047 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -84,6 +84,7 @@ protected FederationStateStore createStateStore() { DATABASE_PASSWORD); conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL, DATABASE_URL + System.currentTimeMillis()); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10); super.setConf(conf); return new HSQLDBFederationStateStore(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java index 788adef371b77..4571371eb6d8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -68,6 +68,7 @@ public void before() throws IOException, YarnException { Configuration conf = new YarnConfiguration(); conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10); setConf(conf); } catch (Exception e) { LOG.error("Cannot initialize ZooKeeper store", e);