- 
                Notifications
    
You must be signed in to change notification settings  - Fork 9.1k
 
HDFS-14617 - Improve fsimage load time by writing sub-sections to the fsimage index #1028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
09646d0
              e365370
              06ee8c4
              7713b7a
              30bfeea
              23efe96
              2aef690
              fb27e28
              997f6a2
              a895756
              654be03
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -883,6 +883,22 @@ public class DFSConfigKeys extends CommonConfigurationKeys { | |
| public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize"; | ||
| public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024; | ||
| 
     | 
||
| public static final String DFS_IMAGE_PARALLEL_LOAD_KEY = | ||
| "dfs.image.parallel.load"; | ||
| public static final boolean DFS_IMAGE_PARALLEL_LOAD_DEFAULT = true; | ||
| 
     | 
||
| public static final String DFS_IMAGE_PARALLEL_TARGET_SECTIONS_KEY = | ||
| "dfs.image.parallel.target.sections"; | ||
| public static final int DFS_IMAGE_PARALLEL_TARGET_SECTIONS_DEFAULT = 12; | ||
| 
     | 
||
| public static final String DFS_IMAGE_PARALLEL_INODE_THRESHOLD_KEY = | ||
| "dfs.image.parallel.inode.threshold"; | ||
| public static final int DFS_IMAGE_PARALLEL_INODE_THRESHOLD_DEFAULT = 1000000; | ||
| 
     | 
||
| public static final String DFS_IMAGE_PARALLEL_THREADS_KEY = | ||
| "dfs.image.parallel.threads"; | ||
| public static final int DFS_IMAGE_PARALLEL_THREADS_DEFAULT = 4; | ||
| 
     | 
||
                
       | 
||
| // Edit Log segment transfer timeout | ||
| public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY = | ||
| "dfs.edit.log.transfer.timeout"; | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -25,6 +25,11 @@ | |
| import java.util.Collection; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.concurrent.CopyOnWriteArrayList; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| 
     | 
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| 
          
            
          
           | 
    @@ -90,6 +95,8 @@ public final class FSImageFormatPBINode { | |
| private static final Logger LOG = | ||
| LoggerFactory.getLogger(FSImageFormatPBINode.class); | ||
| 
     | 
||
| private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000; | ||
| 
     | 
||
| // the loader must decode all fields referencing serial number based fields | ||
| // via to<Item> methods with the string table. | ||
| public final static class Loader { | ||
| 
          
            
          
           | 
    @@ -197,16 +204,66 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) { | |
| private final FSDirectory dir; | ||
| private final FSNamesystem fsn; | ||
| private final FSImageFormatProtobuf.Loader parent; | ||
| private ReentrantLock cacheNameMapLock; | ||
| private ReentrantLock blockMapLock; | ||
| 
     | 
||
| Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) { | ||
| this.fsn = fsn; | ||
| this.dir = fsn.dir; | ||
| this.parent = parent; | ||
| cacheNameMapLock = new ReentrantLock(true); | ||
| blockMapLock = new ReentrantLock(true); | ||
| } | ||
| 
     | 
||
| void loadINodeDirectorySectionInParallel(ExecutorService service, | ||
| ArrayList<FileSummary.Section> sections, String compressionCodec) | ||
| throws IOException { | ||
| LOG.info("Loading the INodeDirectory section in parallel with {} sub-" + | ||
| "sections", sections.size()); | ||
| CountDownLatch latch = new CountDownLatch(sections.size()); | ||
| final CopyOnWriteArrayList<IOException> exceptions = | ||
| new CopyOnWriteArrayList<>(); | ||
| for (FileSummary.Section s : sections) { | ||
| service.submit(() -> { | ||
| InputStream ins = null; | ||
| try { | ||
| ins = parent.getInputStreamForSection(s, | ||
| compressionCodec); | ||
| loadINodeDirectorySection(ins); | ||
| } catch (Exception e) { | ||
| LOG.error("An exception occurred loading INodeDirectories in " + | ||
| "parallel", e); | ||
| exceptions.add(new IOException(e)); | ||
| } finally { | ||
| latch.countDown(); | ||
| try { | ||
| if (ins != null) { | ||
| ins.close(); | ||
| } | ||
| } catch (IOException ioe) { | ||
| LOG.warn("Failed to close the input stream, ignoring", ioe); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| try { | ||
| latch.await(); | ||
| } catch (InterruptedException e) { | ||
| LOG.error("Interrupted waiting for countdown latch", e); | ||
| throw new IOException(e); | ||
| } | ||
| if (exceptions.size() != 0) { | ||
| LOG.error("{} exceptions occurred loading INodeDirectories", | ||
| exceptions.size()); | ||
| throw exceptions.get(0); | ||
| } | ||
| LOG.info("Completed loading all INodeDirectory sub-sections"); | ||
| } | ||
| 
     | 
||
| void loadINodeDirectorySection(InputStream in) throws IOException { | ||
| final List<INodeReference> refList = parent.getLoaderContext() | ||
| .getRefList(); | ||
| ArrayList<INode> inodeList = new ArrayList<>(); | ||
                
       | 
||
| while (true) { | ||
| INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry | ||
| .parseDelimitedFrom(in); | ||
| 
        
          
        
         | 
    @@ -217,33 +274,159 @@ void loadINodeDirectorySection(InputStream in) throws IOException { | |
| INodeDirectory p = dir.getInode(e.getParent()).asDirectory(); | ||
| for (long id : e.getChildrenList()) { | ||
| INode child = dir.getInode(id); | ||
| addToParent(p, child); | ||
| if (addToParent(p, child)) { | ||
| if (child.isFile()) { | ||
| inodeList.add(child); | ||
| } | ||
| if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { | ||
| addToCacheAndBlockMap(inodeList); | ||
| inodeList.clear(); | ||
| } | ||
| } else { | ||
| LOG.warn("Failed to add the inode {} to the directory {}", | ||
| child.getId(), p.getId()); | ||
| } | ||
                
      
                  jojochuang marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| } | ||
| 
     | 
||
| for (int refId : e.getRefChildrenList()) { | ||
| INodeReference ref = refList.get(refId); | ||
| addToParent(p, ref); | ||
| if (addToParent(p, ref)) { | ||
| if (ref.isFile()) { | ||
| inodeList.add(ref); | ||
| } | ||
| if (inodeList.size() >= DIRECTORY_ENTRY_BATCH_SIZE) { | ||
| addToCacheAndBlockMap(inodeList); | ||
| inodeList.clear(); | ||
| } | ||
| } else { | ||
| LOG.warn("Failed to add the inode reference {} to the directory {}", | ||
| ref.getId(), p.getId()); | ||
| } | ||
| } | ||
| } | ||
| addToCacheAndBlockMap(inodeList); | ||
| } | ||
| 
     | 
||
| private void addToCacheAndBlockMap(ArrayList<INode> inodeList) { | ||
| try { | ||
| cacheNameMapLock.lock(); | ||
| for (INode i : inodeList) { | ||
| dir.cacheName(i); | ||
| } | ||
| } finally { | ||
| cacheNameMapLock.unlock(); | ||
| } | ||
| 
     | 
||
| try { | ||
| blockMapLock.lock(); | ||
| for (INode i : inodeList) { | ||
| updateBlocksMap(i.asFile(), fsn.getBlockManager()); | ||
| } | ||
| } finally { | ||
| blockMapLock.unlock(); | ||
| } | ||
| } | ||
| 
     | 
||
| void loadINodeSection(InputStream in, StartupProgress prog, | ||
| Step currentStep) throws IOException { | ||
| INodeSection s = INodeSection.parseDelimitedFrom(in); | ||
| fsn.dir.resetLastInodeId(s.getLastInodeId()); | ||
| long numInodes = s.getNumInodes(); | ||
| LOG.info("Loading " + numInodes + " INodes."); | ||
| prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); | ||
| loadINodeSectionHeader(in, prog, currentStep); | ||
| Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep); | ||
| for (int i = 0; i < numInodes; ++i) { | ||
| int totalLoaded = loadINodesInSection(in, counter); | ||
| LOG.info("Successfully loaded {} inodes", totalLoaded); | ||
| } | ||
| 
     | 
||
| private int loadINodesInSection(InputStream in, Counter counter) | ||
| throws IOException { | ||
| // As the input stream is a LimitInputStream, the reading will stop when | ||
| // EOF is encountered at the end of the stream. | ||
| int cntr = 0; | ||
| while (true) { | ||
| INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in); | ||
| if (p == null) { | ||
| break; | ||
| } | ||
| if (p.getId() == INodeId.ROOT_INODE_ID) { | ||
| loadRootINode(p); | ||
| synchronized(this) { | ||
                
      
                  sodonnel marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| loadRootINode(p); | ||
| } | ||
| } else { | ||
| INode n = loadINode(p); | ||
| dir.addToInodeMap(n); | ||
| synchronized(this) { | ||
| dir.addToInodeMap(n); | ||
| } | ||
| } | ||
| cntr++; | ||
| if (counter != null) { | ||
| counter.increment(); | ||
| } | ||
| } | ||
| return cntr; | ||
| } | ||
| 
     | 
||
| 
     | 
||
| private long loadINodeSectionHeader(InputStream in, StartupProgress prog, | ||
| Step currentStep) throws IOException { | ||
| INodeSection s = INodeSection.parseDelimitedFrom(in); | ||
| fsn.dir.resetLastInodeId(s.getLastInodeId()); | ||
| long numInodes = s.getNumInodes(); | ||
| LOG.info("Loading " + numInodes + " INodes."); | ||
| prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes); | ||
| return numInodes; | ||
| } | ||
| 
     | 
||
| void loadINodeSectionInParallel(ExecutorService service, | ||
| ArrayList<FileSummary.Section> sections, | ||
| String compressionCodec, StartupProgress prog, | ||
| Step currentStep) throws IOException { | ||
| LOG.info("Loading the INode section in parallel with {} sub-sections", | ||
| sections.size()); | ||
| long expectedInodes = 0; | ||
| CountDownLatch latch = new CountDownLatch(sections.size()); | ||
| AtomicInteger totalLoaded = new AtomicInteger(0); | ||
| final CopyOnWriteArrayList<IOException> exceptions = | ||
| new CopyOnWriteArrayList<>(); | ||
| 
     | 
||
| for (int i=0; i < sections.size(); i++) { | ||
| FileSummary.Section s = sections.get(i); | ||
| InputStream ins = parent.getInputStreamForSection(s, compressionCodec); | ||
| if (i == 0) { | ||
| // The first inode section has a header which must be processed first | ||
| expectedInodes = loadINodeSectionHeader(ins, prog, currentStep); | ||
| } | ||
| counter.increment(); | ||
| service.submit(() -> { | ||
| try { | ||
| totalLoaded.addAndGet(loadINodesInSection(ins, null)); | ||
| prog.setCount(Phase.LOADING_FSIMAGE, currentStep, | ||
| totalLoaded.get()); | ||
| } catch (Exception e) { | ||
| LOG.error("An exception occurred loading INodes in parallel", e); | ||
| exceptions.add(new IOException(e)); | ||
| } finally { | ||
| latch.countDown(); | ||
| try { | ||
| ins.close(); | ||
| } catch (IOException ioe) { | ||
| LOG.warn("Failed to close the input stream, ignoring", ioe); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| try { | ||
| latch.await(); | ||
| } catch (InterruptedException e) { | ||
| LOG.info("Interrupted waiting for countdown latch"); | ||
| } | ||
| if (exceptions.size() != 0) { | ||
| LOG.error("{} exceptions occurred loading INodes", exceptions.size()); | ||
                
       | 
||
| throw exceptions.get(0); | ||
| } | ||
| if (totalLoaded.get() != expectedInodes) { | ||
| throw new IOException("Expected to load "+expectedInodes+" in " + | ||
| "parallel, but loaded "+totalLoaded.get()+". The image may " + | ||
| "be corrupt."); | ||
| } | ||
| LOG.info("Completed loading all INode sections. Loaded {} inodes.", | ||
| totalLoaded.get()); | ||
| } | ||
| 
     | 
||
| /** | ||
| 
        
          
        
         | 
    @@ -261,22 +444,18 @@ void loadFilesUnderConstructionSection(InputStream in) throws IOException { | |
| } | ||
| } | ||
| 
     | 
||
| private void addToParent(INodeDirectory parent, INode child) { | ||
| if (parent == dir.rootDir && FSDirectory.isReservedName(child)) { | ||
| private boolean addToParent(INodeDirectory parentDir, INode child) { | ||
| if (parentDir == dir.rootDir && FSDirectory.isReservedName(child)) { | ||
| throw new HadoopIllegalArgumentException("File name \"" | ||
| + child.getLocalName() + "\" is reserved. Please " | ||
| + " change the name of the existing file or directory to another " | ||
| + "name before upgrading to this release."); | ||
| } | ||
| // NOTE: This does not update space counts for parents | ||
| if (!parent.addChildAtLoading(child)) { | ||
| return; | ||
| } | ||
| dir.cacheName(child); | ||
| 
     | 
||
| if (child.isFile()) { | ||
| updateBlocksMap(child.asFile(), fsn.getBlockManager()); | ||
| if (!parentDir.addChildAtLoading(child)) { | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
| 
     | 
||
| private INode loadINode(INodeSection.INode n) { | ||
| 
          
            
          
           | 
    @@ -527,6 +706,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { | |
| final ArrayList<INodeReference> refList = parent.getSaverContext() | ||
| .getRefList(); | ||
| int i = 0; | ||
| int outputInodes = 0; | ||
| while (iter.hasNext()) { | ||
| INodeWithAdditionalFields n = iter.next(); | ||
| if (!n.isDirectory()) { | ||
| 
          
            
          
           | 
    @@ -558,6 +738,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { | |
| refList.add(inode.asReference()); | ||
| b.addRefChildren(refList.size() - 1); | ||
| } | ||
| outputInodes++; | ||
| } | ||
| INodeDirectorySection.DirEntry e = b.build(); | ||
| e.writeDelimitedTo(out); | ||
| 
        
          
        
         | 
    @@ -567,9 +748,15 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException { | |
| if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { | ||
| context.checkCancelled(); | ||
| } | ||
| if (outputInodes >= parent.getInodesPerSubSection()) { | ||
| outputInodes = 0; | ||
| parent.commitSubSection(summary, | ||
| FSImageFormatProtobuf.SectionName.INODE_DIR_SUB); | ||
| } | ||
| } | ||
| parent.commitSection(summary, | ||
| FSImageFormatProtobuf.SectionName.INODE_DIR); | ||
| parent.commitSectionAndSubSection(summary, | ||
| FSImageFormatProtobuf.SectionName.INODE_DIR, | ||
| FSImageFormatProtobuf.SectionName.INODE_DIR_SUB); | ||
| } | ||
| 
     | 
||
| void serializeINodeSection(OutputStream out) throws IOException { | ||
| 
        
          
        
         | 
    @@ -589,8 +776,14 @@ void serializeINodeSection(OutputStream out) throws IOException { | |
| if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) { | ||
| context.checkCancelled(); | ||
| } | ||
| if (i % parent.getInodesPerSubSection() == 0) { | ||
| parent.commitSubSection(summary, | ||
| FSImageFormatProtobuf.SectionName.INODE_SUB); | ||
| } | ||
| } | ||
| parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE); | ||
| parent.commitSectionAndSubSection(summary, | ||
| FSImageFormatProtobuf.SectionName.INODE, | ||
| FSImageFormatProtobuf.SectionName.INODE_SUB); | ||
| } | ||
| 
     | 
||
| void serializeFilesUCSection(OutputStream out) throws IOException { | ||
| 
          
            
          
           | 
    ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can add annotation?
// NameNode fsimage start parallel