diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index b9a43749294ff..34b94d6556fb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; +import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -298,17 +299,33 @@ private void processDirectoriesOfFiles( String dirName = dir.getPath().getName(); for (FileStatus fileNodeStatus : listStatusWithRetries(dir.getPath())) { assert fileNodeStatus.isFile(); - String fileName = fileNodeStatus.getPath().getName(); - if (checkAndRemovePartialRecordWithRetries(fileNodeStatus.getPath())) { - continue; + try { + String fileName = fileNodeStatus.getPath().getName(); + if (checkAndRemovePartialRecordWithRetries( + fileNodeStatus.getPath())) { + continue; + } + byte[] fileData = readFileWithRetries(fileNodeStatus.getPath(), + fileNodeStatus.getLen()); + // Set attribute if not already set + setUnreadableBySuperuserXattrib(fileNodeStatus.getPath()); + + rmAppStateFileProcessor.processChildNode(dirName, fileName, + fileData); + } catch (InvalidProtocolBufferException ex) { + LOG.warn("Removing broken " + dir.getPath() + " app's directory " + + "as its data is invalid, to prevent RM restart failure.", ex); + // removing directory with broken data + if (existsWithRetries(dir.getPath())) { + deleteFileWithRetries(dir.getPath()); + } + break; + } catch (Exception ex) { + LOG.warn("Skipping state loading for " + dir.getPath() + + " app's directory because of exception" + + " to prevent RM restart failure", ex); + break; } - byte[] fileData = readFileWithRetries(fileNodeStatus.getPath(), - fileNodeStatus.getLen()); - // Set attribute if not already set - setUnreadableBySuperuserXattrib(fileNodeStatus.getPath()); - - rmAppStateFileProcessor.processChildNode(dirName, fileName, - fileData); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 6f0d53f243dfe..582629851a029 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -207,6 +207,41 @@ public void testFSRMStateStore() throws Exception { } } + @Test(timeout = 60000) + public void testInvalidAppDataLoad() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + try { + fsTester = new TestFSRMStateStoreTester(cluster, false); + // If the state store is FileSystemRMStateStore + // then add invalid application data. + // It should discard the entry and remove application + // with broken data from the file system. + FileSystemRMStateStore fileSystemRMStateStore = + (FileSystemRMStateStore) fsTester.getRMStateStore(); + String appIdStr = "application_1477986176766_0134"; + ApplicationId appId = + ApplicationId.fromString(appIdStr); + Path appDir = + fsTester.store.getAppDir(appId.toString()); + Path tempAppFile = + new Path(appDir, appId.toString()); + + // write invalid data + try (FSDataOutputStream fsOut = + fileSystemRMStateStore.fs.create(tempAppFile, false)) { + fsOut.write("\\00\\00\\00\\00\\00".getBytes()); + } + + Assert.assertTrue(fileSystemRMStateStore.fs.exists(tempAppFile)); + fsTester.getRMStateStore().loadState(); + Assert.assertFalse(fileSystemRMStateStore.fs.exists(tempAppFile)); + } finally { + cluster.shutdown(); + } + } + @Test(timeout = 60000) public void testHDFSRMStateStore() throws Exception { final HdfsConfiguration conf = new HdfsConfiguration();