Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4061,6 +4061,17 @@ public static boolean isAclEnabled(Configuration conf) {

public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000;

public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT =
FEDERATION_PREFIX + "state-store.clean-up-retry-count";

public static final int DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_COUNT = 1;

public static final String FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
FEDERATION_PREFIX + "state-store.clean-up-retry-sleep-time";

public static final long DEFAULT_FEDERATION_STATESTORE_CLEANUP_RETRY_SLEEP_TIME =
TimeUnit.SECONDS.toMillis(1);

public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";

public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3727,6 +3727,26 @@
<value>yarnfederation/</value>
</property>

<property>
<description>
The number of retries to clear the app in the FederationStateStore,
the default value is 1, that is, after the app fails to clean up, it will retry the cleanup again.
</description>
<name>yarn.federation.state-store.clean-up-retry-count</name>
<value>1</value>
</property>

<property>
<description>
Clear the sleep time of App retry in FederationStateStore.
When the app fails to clean up,
it will sleep for a period of time and then try to clean up.
The default value is 1s.
</description>
<name>yarn.federation.state-store.clean-up-retry-sleep-time</name>
<value>1s</value>
</property>

<!-- Other Configuration -->

<property>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.yarn.server.federation.retry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface FederationActionRetry<T> {

Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class);

T run() throws Exception;

default T runWithRetries(int retryCount, long retrySleepTime) throws Exception {
int retry = 0;
while (true) {
try {
return run();
} catch (Exception e) {
LOG.info("Exception while executing an Federation operation.", e);
if (++retry > retryCount) {
LOG.info("Maxed out Federation retries. Giving up!");
throw e;
}
LOG.info("Retrying operation on Federation. Retry no. {}", retry);
Thread.sleep(retrySleepTime);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Federation Retry Policies. **/
package org.apache.hadoop.yarn.server.federation.retry;
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,6 +116,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private boolean nodeLabelsEnabled;
private Set<String> exclusiveEnforcedPartitions;
private String amDefaultNodeLabel;
private FederationStateStoreService federationStateStoreService;

private static final String USER_ID_PREFIX = "userid=";

Expand Down Expand Up @@ -347,6 +350,7 @@ protected synchronized void checkAppNumCompletedLimit() {
+ ", removing app " + removeApp.getApplicationId()
+ " from state store.");
rmContext.getStateStore().removeApplication(removeApp);
removeApplicationIdFromStateStore(removeId);
completedAppsInStateStore--;
}

Expand All @@ -358,6 +362,7 @@ protected synchronized void checkAppNumCompletedLimit() {
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
+ " from memory: ");
rmContext.getRMApps().remove(removeId);
removeApplicationIdFromStateStore(removeId);
this.applicationACLsManager.removeApplication(removeId);
}
}
Expand Down Expand Up @@ -1054,4 +1059,42 @@ private void copyPlacementQueueToSubmissionContext(
context.setQueue(placementContext.getQueue());
}
}

@VisibleForTesting
public void setFederationStateStoreService(FederationStateStoreService stateStoreService) {
this.federationStateStoreService = stateStoreService;
}

/**
* Remove ApplicationId From StateStore.
*
* @param appId appId
*/
private void removeApplicationIdFromStateStore(ApplicationId appId) {
if (HAUtil.isFederationEnabled(conf) && federationStateStoreService != null) {
try {
boolean cleanUpResult =
federationStateStoreService.cleanUpFinishApplicationsWithRetries(appId, true);
if(cleanUpResult){
LOG.info("applicationId = {} remove from state store success.", appId);
} else {
LOG.warn("applicationId = {} remove from state store failed.", appId);
}
} catch (Exception e) {
LOG.error("applicationId = {} remove from state store error.", appId, e);
}
}
}

// just test using
@VisibleForTesting
public void checkAppNumCompletedLimit4Test() {
checkAppNumCompletedLimit();
}

// just test using
@VisibleForTesting
public void finishApplication4Test(ApplicationId applicationId) {
finishApplication(applicationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ protected void serviceInit(Configuration configuration) throws Exception {
}
federationStateStoreService = createFederationStateStoreService();
addIfService(federationStateStoreService);
rmAppManager.setFederationStateStoreService(federationStateStoreService);
LOG.info("Initialized Federation membership.");
}

Expand Down Expand Up @@ -996,6 +997,13 @@ protected void serviceStart() throws Exception {
RMState state = rmStore.loadState();
recover(state);
LOG.info("Recovery ended");

// Make sure that the App is cleaned up after the RM memory is restored.
if (HAUtil.isFederationEnabled(conf)) {
federationStateStoreService.
createCleanUpFinishApplicationThread("Recovery");
}

} catch (Exception e) {
// the Exception from loadState() needs to be handled for
// HA and we need to give up master status if we got fenced
Expand Down
Loading