diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java index ff457cb5074e..bc0657fc0d82 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java @@ -20,6 +20,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -39,6 +40,40 @@ public final class RecoverLeaseFSUtils { private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class); + private static Class leaseRecoverableClazz = null; + private static Method recoverLeaseMethod = null; + public static final String LEASE_RECOVERABLE_CLASS_NAME = "org.apache.hadoop.fs.LeaseRecoverable"; + static { + LOG.debug("Initialize RecoverLeaseFSUtils"); + initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME); + } + + /** + * Initialize reflection classes and methods. If LeaseRecoverable class is not found, look for + * DistributedFilSystem#recoverLease method. + */ + static void initializeRecoverLeaseMethod(String className) { + try { + leaseRecoverableClazz = Class.forName(className); + recoverLeaseMethod = leaseRecoverableClazz.getMethod("recoverLease", Path.class); + LOG.debug("set recoverLeaseMethod to " + className + ".recoverLease()"); + } catch (ClassNotFoundException e) { + LOG.debug( + "LeaseRecoverable interface not in the classpath, this means Hadoop 3.3.5 or below."); + try { + recoverLeaseMethod = DistributedFileSystem.class.getMethod("recoverLease", Path.class); + } catch (NoSuchMethodException ex) { + LOG.error("Cannot find recoverLease method in DistributedFileSystem class. " + + "It should never happen. Abort.", ex); + throw new RuntimeException(ex); + } + } catch (NoSuchMethodException e) { + LOG.error("Cannot find recoverLease method in LeaseRecoverable class. " + + "It should never happen. Abort.", e); + throw new RuntimeException(e); + } + } + private RecoverLeaseFSUtils() { } @@ -48,18 +83,31 @@ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf) } /** - * Recover the lease from HDFS, retrying multiple times. + * Recover the lease from Hadoop file system, retrying multiple times. */ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf, CancelableProgressable reporter) throws IOException { if (fs instanceof FilterFileSystem) { fs = ((FilterFileSystem) fs).getRawFileSystem(); } + // lease recovery not needed for local file system case. - if (!(fs instanceof DistributedFileSystem)) { - return; + if (isLeaseRecoverable(fs)) { + recoverDFSFileLease(fs, p, conf, reporter); } - recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter); + } + + public static boolean isLeaseRecoverable(FileSystem fs) { + // return true if HDFS. + if (fs instanceof DistributedFileSystem) { + return true; + } + // return true if the file system implements LeaseRecoverable interface. + if (leaseRecoverableClazz != null) { + return leaseRecoverableClazz.isAssignableFrom(fs.getClass()); + } + // return false if the file system is not HDFS and does not implement LeaseRecoverable. + return false; } /* @@ -81,7 +129,7 @@ public static void recoverFileLease(FileSystem fs, Path p, Configuration conf, * false, repeat starting at step 5. above. If HDFS-4525 is available, call it every second, and * we might be able to exit early. */ - private static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, + private static boolean recoverDFSFileLease(final FileSystem dfs, final Path p, final Configuration conf, final CancelableProgressable reporter) throws IOException { LOG.info("Recover lease on dfs file " + p); long startWaiting = EnvironmentEdgeManager.currentTime(); @@ -167,14 +215,15 @@ private static boolean checkIfTimedout(final Configuration conf, final long reco * Try to recover the lease. * @return True if dfs#recoverLease came by true. */ - private static boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, - final Path p, final long startWaiting) throws FileNotFoundException { + private static boolean recoverLease(final FileSystem dfs, final int nbAttempt, final Path p, + final long startWaiting) throws FileNotFoundException { boolean recovered = false; try { - recovered = dfs.recoverLease(p); + recovered = (Boolean) recoverLeaseMethod.invoke(dfs, p); LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ") + getLogMessageDetail(nbAttempt, p, startWaiting)); - } catch (IOException e) { + } catch (InvocationTargetException ite) { + final Throwable e = ite.getCause(); if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { // This exception comes out instead of FNFE, fix it throw new FileNotFoundException("The given WAL wasn't found at " + p); @@ -182,6 +231,9 @@ private static boolean recoverLease(final DistributedFileSystem dfs, final int n throw (FileNotFoundException) e; } LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); + } catch (IllegalAccessException e) { + LOG.error("Failed to call recoverLease on {}. Abort.", dfs, e); + throw new RuntimeException(e); } return recovered; } @@ -197,8 +249,7 @@ private static String getLogMessageDetail(final int nbAttempt, final Path p, * Call HDFS-4525 isFileClosed if it is available. * @return True if file is closed. */ - private static boolean isFileClosed(final DistributedFileSystem dfs, final Method m, - final Path p) { + private static boolean isFileClosed(final FileSystem dfs, final Method m, final Path p) { try { return (Boolean) m.invoke(dfs, p); } catch (SecurityException e) { diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java index 953d66b3fa45..354095d15ff5 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/util/TestRecoverLeaseFSUtils.java @@ -17,10 +17,18 @@ */ package org.apache.hadoop.hbase.util; +import static org.apache.hadoop.hbase.util.RecoverLeaseFSUtils.LEASE_RECOVERABLE_CLASS_NAME; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; @@ -30,7 +38,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; /** * Test our recoverLease loop against mocked up filesystem. @@ -58,20 +65,60 @@ public class TestRecoverLeaseFSUtils { public void testRecoverLease() throws IOException { long startTime = EnvironmentEdgeManager.currentTime(); HTU.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); - CancelableProgressable reporter = Mockito.mock(CancelableProgressable.class); - Mockito.when(reporter.progress()).thenReturn(true); - DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class); + CancelableProgressable reporter = mock(CancelableProgressable.class); + when(reporter.progress()).thenReturn(true); + DistributedFileSystem dfs = mock(DistributedFileSystem.class); // Fail four times and pass on the fifth. - Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false) + when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false) .thenReturn(false).thenReturn(true); RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter); - Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE); + verify(dfs, times(5)).recoverLease(FILE); // Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the first two // invocations will happen pretty fast... the we fall into the longer wait loop). assertTrue((EnvironmentEdgeManager.currentTime() - startTime) > (3 * HTU.getConfiguration().getInt("hbase.lease.recovery.dfs.timeout", 61000))); } + private interface FakeLeaseRecoverable { + @SuppressWarnings("unused") + boolean recoverLease(Path p) throws IOException; + + @SuppressWarnings("unused") + boolean isFileClosed(Path p) throws IOException; + } + + private static abstract class RecoverableFileSystem extends FileSystem + implements FakeLeaseRecoverable { + @Override + public boolean recoverLease(Path p) throws IOException { + return true; + } + + @Override + public boolean isFileClosed(Path p) throws IOException { + return true; + } + } + + /** + * Test that we can use reflection to access LeaseRecoverable methods. + */ + @Test + public void testLeaseRecoverable() throws IOException { + try { + // set LeaseRecoverable to FakeLeaseRecoverable for testing + RecoverLeaseFSUtils.initializeRecoverLeaseMethod(FakeLeaseRecoverable.class.getName()); + RecoverableFileSystem mockFS = mock(RecoverableFileSystem.class); + when(mockFS.recoverLease(FILE)).thenReturn(true); + RecoverLeaseFSUtils.recoverFileLease(mockFS, FILE, HTU.getConfiguration()); + verify(mockFS, times(1)).recoverLease(FILE); + + assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(mock(RecoverableFileSystem.class))); + } finally { + RecoverLeaseFSUtils.initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME); + } + } + /** * Test that isFileClosed makes us recover lease faster. */ @@ -79,17 +126,23 @@ public void testRecoverLease() throws IOException { public void testIsFileClosed() throws IOException { // Make this time long so it is plain we broke out because of the isFileClosed invocation. HTU.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 100000); - CancelableProgressable reporter = Mockito.mock(CancelableProgressable.class); - Mockito.when(reporter.progress()).thenReturn(true); - IsFileClosedDistributedFileSystem dfs = Mockito.mock(IsFileClosedDistributedFileSystem.class); + CancelableProgressable reporter = mock(CancelableProgressable.class); + when(reporter.progress()).thenReturn(true); + IsFileClosedDistributedFileSystem dfs = mock(IsFileClosedDistributedFileSystem.class); // Now make it so we fail the first two times -- the two fast invocations, then we fall into // the long loop during which we will call isFileClosed.... the next invocation should // therefore return true if we are to break the loop. - Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true); - Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true); + when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true); + when(dfs.isFileClosed(FILE)).thenReturn(true); RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, HTU.getConfiguration(), reporter); - Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE); - Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE); + verify(dfs, times(2)).recoverLease(FILE); + verify(dfs, times(1)).isFileClosed(FILE); + } + + @Test + public void testIsLeaseRecoverable() { + assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new DistributedFileSystem())); + assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new LocalFileSystem())); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 4b553de4174a..ba2010554660 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -114,6 +114,23 @@ public final class FSUtils { // currently only used in testing. TODO refactor into a test class public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows"); + private static Class safeModeClazz = null; + private static Class safeModeActionClazz = null; + private static Object safeModeGet = null; + { + try { + safeModeClazz = Class.forName("org.apache.hadoop.fs.SafeMode"); + safeModeActionClazz = Class.forName("org.apache.hadoop.fs.SafeModeAction"); + safeModeGet = safeModeClazz.getField("SAFEMODE_GET").get(null); + } catch (ClassNotFoundException | NoSuchFieldException e) { + LOG.debug("SafeMode interface not in the classpath, this means Hadoop 3.3.5 or below."); + } catch (IllegalAccessException e) { + LOG.error("SafeModeAction.SAFEMODE_GET is not accessible. " + + "Unexpected Hadoop version or messy classpath?", e); + throw new RuntimeException(e); + } + } + private FSUtils() { } @@ -248,8 +265,20 @@ public static void checkFileSystemAvailable(final FileSystem fs) throws IOExcept * @param dfs A DistributedFileSystem object representing the underlying HDFS. * @return whether we're in safe mode */ - private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOException { - return dfs.setSafeMode(SAFEMODE_GET, true); + private static boolean isInSafeMode(FileSystem dfs) throws IOException { + if (isDistributedFileSystem(dfs)) { + return ((DistributedFileSystem) dfs).setSafeMode(SAFEMODE_GET, true); + } else { + try { + Object ret = dfs.getClass() + .getMethod("setSafeMode", new Class[] { safeModeActionClazz, Boolean.class }) + .invoke(dfs, safeModeGet, true); + return (Boolean) ret; + } catch (InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { + LOG.error("The file system does not support setSafeMode(). Abort.", e); + throw new RuntimeException(e); + } + } } /** @@ -258,9 +287,8 @@ private static boolean isInSafeMode(DistributedFileSystem dfs) throws IOExceptio public static void checkDfsSafeMode(final Configuration conf) throws IOException { boolean isInSafeMode = false; FileSystem fs = FileSystem.get(conf); - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem) fs; - isInSafeMode = isInSafeMode(dfs); + if (supportSafeMode(fs)) { + isInSafeMode = isInSafeMode(fs); } if (isInSafeMode) { throw new IOException("File system is in safemode, it can't be written now"); @@ -635,10 +663,11 @@ public static void setClusterId(FileSystem fs, Path rootdir, ClusterId clusterId */ public static void waitOnSafeMode(final Configuration conf, final long wait) throws IOException { FileSystem fs = FileSystem.get(conf); - if (!(fs instanceof DistributedFileSystem)) return; - DistributedFileSystem dfs = (DistributedFileSystem) fs; + if (!supportSafeMode(fs)) { + return; + } // Make sure dfs is not in safe mode - while (isInSafeMode(dfs)) { + while (isInSafeMode(fs)) { LOG.info("Waiting for dfs to exit safe mode..."); try { Thread.sleep(wait); @@ -649,6 +678,19 @@ public static void waitOnSafeMode(final Configuration conf, final long wait) thr } } + public static boolean supportSafeMode(FileSystem fs) { + // return true if HDFS. + if (fs instanceof DistributedFileSystem) { + return true; + } + // return true if the file system implements SafeMode interface. + if (safeModeClazz != null) { + return (safeModeClazz.isAssignableFrom(fs.getClass())); + } + // return false if the file system is not HDFS and does not implement SafeMode interface. + return false; + } + /** * Checks if meta region exists * @param fs file system diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index ebac18c153ca..ce0e4bcea66b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.permission.FsPermission; @@ -90,6 +91,8 @@ public void testIsHDFS() throws Exception { try { cluster = htu.startMiniDFSCluster(1); assertTrue(CommonFSUtils.isHDFS(conf)); + assertTrue(FSUtils.supportSafeMode(cluster.getFileSystem())); + FSUtils.checkDfsSafeMode(conf); } finally { if (cluster != null) { cluster.shutdown(); @@ -97,6 +100,14 @@ public void testIsHDFS() throws Exception { } } + @Test + public void testLocalFileSystemSafeMode() throws Exception { + conf.setClass("fs.file.impl", LocalFileSystem.class, FileSystem.class); + assertFalse(CommonFSUtils.isHDFS(conf)); + assertFalse(FSUtils.supportSafeMode(FileSystem.get(conf))); + FSUtils.checkDfsSafeMode(conf); + } + private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize) throws Exception { FSDataOutputStream out = fs.create(file); byte[] data = new byte[dataSize];