Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ba324a5
YARN-11290. Improve Query Condition of FederationStateStore#getApplic…
Sep 2, 2022
a1629b7
YARN-11290. Fix CheckStyle.
Sep 3, 2022
9c499ca
YARN-11290. Fix CheckStyle.
Sep 4, 2022
354b8b3
YARN-11290. Modify the sqlserver stored procedure script.
Sep 4, 2022
eecc588
YARN-11290. Add Mysql sp_getApplicationsHomeSubCluster Script And Fix…
Sep 5, 2022
499986a
YARN-11290. Fix CheckStyle.
Sep 5, 2022
e1ee2a3
YARN-11290. Fix submitReservation First Add
Sep 6, 2022
023bb5c
YARN-11290. Fix SqlServer Script.
Sep 6, 2022
d309ac5
YARN-11290. Fix CheckStyle.
Sep 6, 2022
e96f796
YARN-11290. Fix CheckStyle.
Sep 6, 2022
7de66bf
Merge branch 'apache:trunk' into YARN-11290
slfan1989 Sep 7, 2022
40f6d99
YARN-11290. Fix CodeStyle.
Sep 7, 2022
f6df901
YARN-11290. Fix CheckStyle.
Sep 8, 2022
2a09fa7
Merge branch 'trunk' into YARN-11290
slfan1989 Sep 9, 2022
152300c
YARN-11290. Modify SQL code.
Sep 9, 2022
c7620af
YARN-11290. Modify SQL code.
Sep 9, 2022
358977b
Merge branch 'trunk' into YARN-11290
slfan1989 Sep 11, 2022
a536e30
YARN-11290. Improve the code.
Sep 12, 2022
1d00020
YARN-11290. Fix CheckStyle.
Sep 12, 2022
411d4d2
YARN-11290. Improve Mysql Script.
Sep 13, 2022
a12ea7e
YARN-11290. Fix CheckStyle.
Sep 13, 2022
923622c
YARN-11290. Improve Code Style.
Sep 14, 2022
490cf3c
YARN-11290. Fix CheckStyle.
Sep 14, 2022
32042dc
YARN-11290. Fix CodeStyle.
Sep 15, 2022
bd195f2
YARN-11290. Fix CheckStyle.
Sep 17, 2022
b948fab
YARN-11290. Merge Trunk Branch.
Sep 23, 2022
d3ecfec
YARN-11290. Merge Trunk Branch.
Sep 23, 2022
27be255
Merge branch 'trunk' into YARN-11290
slfan1989 Sep 23, 2022
da64b19
YARN-11290. Fix CheckStyle.
Sep 23, 2022
794f79b
Merge branch 'apache:trunk' into YARN-11290
slfan1989 Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,21 @@ BEGIN
WHERE applicationId = applicationID_IN;
END //

CREATE PROCEDURE sp_getApplicationsHomeSubCluster()
BEGIN
SELECT applicationId, homeSubCluster
FROM applicationsHomeSubCluster;
CREATE PROCEDURE sp_getApplicationsHomeSubCluster(IN limit_IN int, IN homeSubCluster_IN varchar(256))
BEGIN
SELECT
applicationId,
homeSubCluster,
createTime
FROM (SELECT
applicationId,
homeSubCluster,
createTime,
@rownum := 0
FROM applicationshomesubcluster
ORDER BY createTime DESC) AS applicationshomesubcluster
WHERE (homeSubCluster_IN = '' OR homeSubCluster = homeSubCluster_IN)
AND (@rownum := @rownum + 1) <= limit_IN;
END //

CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ USE FederationStateStore

CREATE TABLE applicationsHomeSubCluster(
applicationId varchar(64) NOT NULL,
homeSubCluster varchar(256) NULL,
homeSubCluster varchar(256) NOT NULL,
createTime datetime NOT NULL,
CONSTRAINT pk_applicationId PRIMARY KEY (applicationId)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,26 @@ IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL
GO

CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster]
@limit int,
@homeSubCluster VARCHAR(256)
AS BEGIN
DECLARE @errorMessage nvarchar(4000)

BEGIN TRY
SELECT [applicationId], [homeSubCluster], [createTime]
FROM [dbo].[applicationsHomeSubCluster]

SELECT
[applicationId],
[homeSubCluster],
[createTime]
FROM(SELECT
[applicationId],
[homeSubCluster],
[createTime],
row_number() over(order by [createTime] desc) AS app_rank
FROM [dbo].[applicationsHomeSubCluster]
WHERE [homeSubCluster] = @homeSubCluster OR @homeSubCluster = '') AS applicationsHomeSubCluster
WHERE app_rank <= @limit;

END TRY

BEGIN CATCH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4056,6 +4056,11 @@ public static boolean isAclEnabled(Configuration conf) {

public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1;

public static final String FEDERATION_STATESTORE_MAX_APPLICATIONS =
FEDERATION_PREFIX + "state-store.max-applications";

public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000;

public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";

public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5007,4 +5007,13 @@
</description>
</property>

<property>
<name>yarn.federation.state-store.max-applications</name>
<value>1000</value>
<description>
Yarn federation state-store supports querying the maximum number of apps.
Default is 1000.
</description>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Comparator;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
Expand Down Expand Up @@ -90,6 +94,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;

/**
* In-memory implementation of {@link FederationStateStore}.
*/
Expand All @@ -100,6 +106,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
private Map<ReservationId, SubClusterId> reservations;
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;
private int maxAppsInStateStore;

private final MonotonicClock clock = new MonotonicClock();

Expand All @@ -113,6 +120,9 @@ public void init(Configuration conf) {
reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
routerRMSecretManagerState = new RouterRMDTSecretManagerState();
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
}

@Override
Expand Down Expand Up @@ -266,17 +276,28 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
List<ApplicationHomeSubCluster> result =
new ArrayList<ApplicationHomeSubCluster>();
for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) {
result
.add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));

if (request == null) {
throw new YarnException("Missing getApplicationsHomeSubCluster request");
}

GetApplicationsHomeSubClusterResponse.newInstance(result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just overlooked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line of code should be an extra line of code, which has no practical significance. The following is directly constructed and returned.

SubClusterId requestSC = request.getSubClusterId();
List<ApplicationHomeSubCluster> result = applications.keySet().stream()
.map(applicationId -> generateAppHomeSC(applicationId))
.sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
.filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
.limit(maxAppsInStateStore)
.collect(Collectors.toList());

LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size());
return GetApplicationsHomeSubClusterResponse.newInstance(result);
}

private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) {
SubClusterId subClusterId = applications.get(applicationId);
return ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), subClusterId);
}

@Override
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class SQLFederationStateStore implements FederationStateStore {
"{call sp_getApplicationHomeSubCluster(?, ?)}";

private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
"{call sp_getApplicationsHomeSubCluster()}";
"{call sp_getApplicationsHomeSubCluster(?, ?)}";

private static final String CALL_SP_SET_POLICY_CONFIGURATION =
"{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
Expand Down Expand Up @@ -176,6 +176,7 @@ public class SQLFederationStateStore implements FederationStateStore {
private final Clock clock = new MonotonicClock();
@VisibleForTesting
Connection conn = null;
private int maxAppsInStateStore;

@Override
public void init(Configuration conf) throws YarnException {
Expand Down Expand Up @@ -215,6 +216,10 @@ public void init(Configuration conf) throws YarnException {
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Not able to get Connection", e);
}

maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
}

@Override
Expand Down Expand Up @@ -748,24 +753,35 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {

if (request == null) {
throw new YarnException("Missing getApplicationsHomeSubCluster request");
}

CallableStatement cstmt = null;
ResultSet rs = null;
List<ApplicationHomeSubCluster> appsHomeSubClusters =
new ArrayList<ApplicationHomeSubCluster>();
List<ApplicationHomeSubCluster> appsHomeSubClusters = new ArrayList<>();

try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
cstmt.setInt("limit_IN", maxAppsInStateStore);
String homeSubClusterIN = StringUtils.EMPTY;
SubClusterId subClusterId = request.getSubClusterId();
if (subClusterId != null) {
homeSubClusterIN = subClusterId.toString();
}
cstmt.setString("homeSubCluster_IN", homeSubClusterIN);

// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();

while (rs.next()) {
while (rs.next() && appsHomeSubClusters.size() <= maxAppsInStateStore) {

// Extract the output for each tuple
String applicationId = rs.getString(1);
String homeSubCluster = rs.getString(2);
String applicationId = rs.getString("applicationId");
String homeSubCluster = rs.getString("homeSubCluster");

appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance(
ApplicationId.fromString(applicationId),
Expand All @@ -783,8 +799,8 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
return GetApplicationsHomeSubClusterResponse
.newInstance(appsHomeSubClusters);

return GetApplicationsHomeSubClusterResponse.newInstance(appsHomeSubClusters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import java.util.Comparator;
import java.util.stream.Collectors;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
Expand Down Expand Up @@ -98,6 +101,8 @@

import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;

import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;

/**
* ZooKeeper implementation of {@link FederationStateStore}.
*
Expand Down Expand Up @@ -136,6 +141,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
private String membershipZNode;
private String policiesZNode;
private String reservationsZNode;
private int maxAppsInStateStore;

private volatile Clock clock = SystemClock.getInstance();

Expand All @@ -147,6 +153,10 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
public void init(Configuration conf) throws YarnException {
LOG.info("Initializing ZooKeeper connection");

maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);

baseZNode = conf.get(
YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
Expand Down Expand Up @@ -258,24 +268,44 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
long start = clock.getTime();
List<ApplicationHomeSubCluster> result = new ArrayList<>();

if (request == null) {
throw new YarnException("Missing getApplicationsHomeSubCluster request");
}

try {
for (String child : zkManager.getChildren(appsZNode)) {
ApplicationId appId = ApplicationId.fromString(child);
SubClusterId homeSubCluster = getApp(appId);
ApplicationHomeSubCluster app =
ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
result.add(app);
}
long start = clock.getTime();
SubClusterId requestSC = request.getSubClusterId();
List<String> children = zkManager.getChildren(appsZNode);
List<ApplicationHomeSubCluster> result = children.stream()
.map(child -> generateAppHomeSC(child))
.sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
.filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
.limit(maxAppsInStateStore)
.collect(Collectors.toList());
long end = clock.getTime();
opDurations.addGetAppsHomeSubClusterDuration(start, end);
LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size());
return GetApplicationsHomeSubClusterResponse.newInstance(result);
} catch (Exception e) {
String errMsg = "Cannot get apps: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
long end = clock.getTime();
opDurations.addGetAppsHomeSubClusterDuration(start, end);
return GetApplicationsHomeSubClusterResponse.newInstance(result);

throw new YarnException("Cannot get app by request");
}

private ApplicationHomeSubCluster generateAppHomeSC(String appId) {
try {
ApplicationId applicationId = ApplicationId.fromString(appId);
SubClusterId homeSubCluster = getApp(applicationId);
ApplicationHomeSubCluster app =
ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), homeSubCluster);
return app;
} catch (Exception ex) {
LOG.error("get homeSubCluster by appId = {}.", appId);
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ public static ApplicationHomeSubCluster newInstance(ApplicationId appId,
return appMapping;
}

@Private
@Unstable
public static ApplicationHomeSubCluster newInstance(ApplicationId appId, long createTime,
SubClusterId homeSubCluster) {
ApplicationHomeSubCluster appMapping = Records.newRecord(ApplicationHomeSubCluster.class);
appMapping.setApplicationId(appId);
appMapping.setHomeSubCluster(homeSubCluster);
appMapping.setCreateTime(createTime);
return appMapping;
}

/**
* Get the {@link ApplicationId} representing the unique identifier of the
* application.
Expand Down Expand Up @@ -91,6 +102,25 @@ public static ApplicationHomeSubCluster newInstance(ApplicationId appId,
@Unstable
public abstract void setHomeSubCluster(SubClusterId homeSubCluster);

/**
* Get the create time of the subcluster.
*
* @return the state of the subcluster
*/
@Public
@Unstable
public abstract long getCreateTime();

/**
* Set the create time of the subcluster.
*
* @param time the last heartbeat time of the subcluster
*/
@Private
@Unstable
public abstract void setCreateTime(long time);


@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand Down
Loading