2525import java .util .Collection ;
2626import java .util .Iterator ;
2727import java .util .List ;
28+ import java .util .concurrent .CopyOnWriteArrayList ;
29+ import java .util .concurrent .CountDownLatch ;
30+ import java .util .concurrent .ExecutorService ;
31+ import java .util .concurrent .atomic .AtomicInteger ;
32+ import java .util .concurrent .locks .ReentrantLock ;
2833
2934import org .slf4j .Logger ;
3035import org .slf4j .LoggerFactory ;
@@ -90,6 +95,8 @@ public final class FSImageFormatPBINode {
9095 private static final Logger LOG =
9196 LoggerFactory .getLogger (FSImageFormatPBINode .class );
9297
98+ private static final int DIRECTORY_ENTRY_BATCH_SIZE = 1000 ;
99+
93100 // the loader must decode all fields referencing serial number based fields
94101 // via to<Item> methods with the string table.
95102 public final static class Loader {
@@ -197,16 +204,66 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) {
197204 private final FSDirectory dir ;
198205 private final FSNamesystem fsn ;
199206 private final FSImageFormatProtobuf .Loader parent ;
207+ private ReentrantLock cacheNameMapLock ;
208+ private ReentrantLock blockMapLock ;
200209
201210 Loader (FSNamesystem fsn , final FSImageFormatProtobuf .Loader parent ) {
202211 this .fsn = fsn ;
203212 this .dir = fsn .dir ;
204213 this .parent = parent ;
214+ cacheNameMapLock = new ReentrantLock (true );
215+ blockMapLock = new ReentrantLock (true );
216+ }
217+
218+ void loadINodeDirectorySectionInParallel (ExecutorService service ,
219+ ArrayList <FileSummary .Section > sections , String compressionCodec )
220+ throws IOException {
221+ LOG .info ("Loading the INodeDirectory section in parallel with {} sub-" +
222+ "sections" , sections .size ());
223+ CountDownLatch latch = new CountDownLatch (sections .size ());
224+ final CopyOnWriteArrayList <IOException > exceptions =
225+ new CopyOnWriteArrayList <>();
226+ for (FileSummary .Section s : sections ) {
227+ service .submit (() -> {
228+ InputStream ins = null ;
229+ try {
230+ ins = parent .getInputStreamForSection (s ,
231+ compressionCodec );
232+ loadINodeDirectorySection (ins );
233+ } catch (Exception e ) {
234+ LOG .error ("An exception occurred loading INodeDirectories in " +
235+ "parallel" , e );
236+ exceptions .add (new IOException (e ));
237+ } finally {
238+ latch .countDown ();
239+ try {
240+ if (ins != null ) {
241+ ins .close ();
242+ }
243+ } catch (IOException ioe ) {
244+ LOG .warn ("Failed to close the input stream, ignoring" , ioe );
245+ }
246+ }
247+ });
248+ }
249+ try {
250+ latch .await ();
251+ } catch (InterruptedException e ) {
252+ LOG .error ("Interrupted waiting for countdown latch" , e );
253+ throw new IOException (e );
254+ }
255+ if (exceptions .size () != 0 ) {
256+ LOG .error ("{} exceptions occurred loading INodeDirectories" ,
257+ exceptions .size ());
258+ throw exceptions .get (0 );
259+ }
260+ LOG .info ("Completed loading all INodeDirectory sub-sections" );
205261 }
206262
207263 void loadINodeDirectorySection (InputStream in ) throws IOException {
208264 final List <INodeReference > refList = parent .getLoaderContext ()
209265 .getRefList ();
266+ ArrayList <INode > inodeList = new ArrayList <>();
210267 while (true ) {
211268 INodeDirectorySection .DirEntry e = INodeDirectorySection .DirEntry
212269 .parseDelimitedFrom (in );
@@ -217,33 +274,159 @@ void loadINodeDirectorySection(InputStream in) throws IOException {
217274 INodeDirectory p = dir .getInode (e .getParent ()).asDirectory ();
218275 for (long id : e .getChildrenList ()) {
219276 INode child = dir .getInode (id );
220- addToParent (p , child );
277+ if (addToParent (p , child )) {
278+ if (child .isFile ()) {
279+ inodeList .add (child );
280+ }
281+ if (inodeList .size () >= DIRECTORY_ENTRY_BATCH_SIZE ) {
282+ addToCacheAndBlockMap (inodeList );
283+ inodeList .clear ();
284+ }
285+ } else {
286+ LOG .warn ("Failed to add the inode {} to the directory {}" ,
287+ child .getId (), p .getId ());
288+ }
221289 }
290+
222291 for (int refId : e .getRefChildrenList ()) {
223292 INodeReference ref = refList .get (refId );
224- addToParent (p , ref );
293+ if (addToParent (p , ref )) {
294+ if (ref .isFile ()) {
295+ inodeList .add (ref );
296+ }
297+ if (inodeList .size () >= DIRECTORY_ENTRY_BATCH_SIZE ) {
298+ addToCacheAndBlockMap (inodeList );
299+ inodeList .clear ();
300+ }
301+ } else {
302+ LOG .warn ("Failed to add the inode reference {} to the directory {}" ,
303+ ref .getId (), p .getId ());
304+ }
305+ }
306+ }
307+ addToCacheAndBlockMap (inodeList );
308+ }
309+
310+ private void addToCacheAndBlockMap (ArrayList <INode > inodeList ) {
311+ try {
312+ cacheNameMapLock .lock ();
313+ for (INode i : inodeList ) {
314+ dir .cacheName (i );
315+ }
316+ } finally {
317+ cacheNameMapLock .unlock ();
318+ }
319+
320+ try {
321+ blockMapLock .lock ();
322+ for (INode i : inodeList ) {
323+ updateBlocksMap (i .asFile (), fsn .getBlockManager ());
225324 }
325+ } finally {
326+ blockMapLock .unlock ();
226327 }
227328 }
228329
229330 void loadINodeSection (InputStream in , StartupProgress prog ,
230331 Step currentStep ) throws IOException {
231- INodeSection s = INodeSection .parseDelimitedFrom (in );
232- fsn .dir .resetLastInodeId (s .getLastInodeId ());
233- long numInodes = s .getNumInodes ();
234- LOG .info ("Loading " + numInodes + " INodes." );
235- prog .setTotal (Phase .LOADING_FSIMAGE , currentStep , numInodes );
332+ loadINodeSectionHeader (in , prog , currentStep );
236333 Counter counter = prog .getCounter (Phase .LOADING_FSIMAGE , currentStep );
237- for (int i = 0 ; i < numInodes ; ++i ) {
334+ int totalLoaded = loadINodesInSection (in , counter );
335+ LOG .info ("Successfully loaded {} inodes" , totalLoaded );
336+ }
337+
338+ private int loadINodesInSection (InputStream in , Counter counter )
339+ throws IOException {
340+ // As the input stream is a LimitInputStream, the reading will stop when
341+ // EOF is encountered at the end of the stream.
342+ int cntr = 0 ;
343+ while (true ) {
238344 INodeSection .INode p = INodeSection .INode .parseDelimitedFrom (in );
345+ if (p == null ) {
346+ break ;
347+ }
239348 if (p .getId () == INodeId .ROOT_INODE_ID ) {
240- loadRootINode (p );
349+ synchronized (this ) {
350+ loadRootINode (p );
351+ }
241352 } else {
242353 INode n = loadINode (p );
243- dir .addToInodeMap (n );
354+ synchronized (this ) {
355+ dir .addToInodeMap (n );
356+ }
357+ }
358+ cntr ++;
359+ if (counter != null ) {
360+ counter .increment ();
361+ }
362+ }
363+ return cntr ;
364+ }
365+
366+
367+ private long loadINodeSectionHeader (InputStream in , StartupProgress prog ,
368+ Step currentStep ) throws IOException {
369+ INodeSection s = INodeSection .parseDelimitedFrom (in );
370+ fsn .dir .resetLastInodeId (s .getLastInodeId ());
371+ long numInodes = s .getNumInodes ();
372+ LOG .info ("Loading " + numInodes + " INodes." );
373+ prog .setTotal (Phase .LOADING_FSIMAGE , currentStep , numInodes );
374+ return numInodes ;
375+ }
376+
377+ void loadINodeSectionInParallel (ExecutorService service ,
378+ ArrayList <FileSummary .Section > sections ,
379+ String compressionCodec , StartupProgress prog ,
380+ Step currentStep ) throws IOException {
381+ LOG .info ("Loading the INode section in parallel with {} sub-sections" ,
382+ sections .size ());
383+ long expectedInodes = 0 ;
384+ CountDownLatch latch = new CountDownLatch (sections .size ());
385+ AtomicInteger totalLoaded = new AtomicInteger (0 );
386+ final CopyOnWriteArrayList <IOException > exceptions =
387+ new CopyOnWriteArrayList <>();
388+
389+ for (int i =0 ; i < sections .size (); i ++) {
390+ FileSummary .Section s = sections .get (i );
391+ InputStream ins = parent .getInputStreamForSection (s , compressionCodec );
392+ if (i == 0 ) {
393+ // The first inode section has a header which must be processed first
394+ expectedInodes = loadINodeSectionHeader (ins , prog , currentStep );
244395 }
245- counter .increment ();
396+ service .submit (() -> {
397+ try {
398+ totalLoaded .addAndGet (loadINodesInSection (ins , null ));
399+ prog .setCount (Phase .LOADING_FSIMAGE , currentStep ,
400+ totalLoaded .get ());
401+ } catch (Exception e ) {
402+ LOG .error ("An exception occurred loading INodes in parallel" , e );
403+ exceptions .add (new IOException (e ));
404+ } finally {
405+ latch .countDown ();
406+ try {
407+ ins .close ();
408+ } catch (IOException ioe ) {
409+ LOG .warn ("Failed to close the input stream, ignoring" , ioe );
410+ }
411+ }
412+ });
413+ }
414+ try {
415+ latch .await ();
416+ } catch (InterruptedException e ) {
417+ LOG .info ("Interrupted waiting for countdown latch" );
246418 }
419+ if (exceptions .size () != 0 ) {
420+ LOG .error ("{} exceptions occurred loading INodes" , exceptions .size ());
421+ throw exceptions .get (0 );
422+ }
423+ if (totalLoaded .get () != expectedInodes ) {
424+ throw new IOException ("Expected to load " +expectedInodes +" in " +
425+ "parallel, but loaded " +totalLoaded .get ()+". The image may " +
426+ "be corrupt." );
427+ }
428+ LOG .info ("Completed loading all INode sections. Loaded {} inodes." ,
429+ totalLoaded .get ());
247430 }
248431
249432 /**
@@ -261,22 +444,18 @@ void loadFilesUnderConstructionSection(InputStream in) throws IOException {
261444 }
262445 }
263446
264- private void addToParent (INodeDirectory parent , INode child ) {
265- if (parent == dir .rootDir && FSDirectory .isReservedName (child )) {
447+ private boolean addToParent (INodeDirectory parentDir , INode child ) {
448+ if (parentDir == dir .rootDir && FSDirectory .isReservedName (child )) {
266449 throw new HadoopIllegalArgumentException ("File name \" "
267450 + child .getLocalName () + "\" is reserved. Please "
268451 + " change the name of the existing file or directory to another "
269452 + "name before upgrading to this release." );
270453 }
271454 // NOTE: This does not update space counts for parents
272- if (!parent .addChildAtLoading (child )) {
273- return ;
274- }
275- dir .cacheName (child );
276-
277- if (child .isFile ()) {
278- updateBlocksMap (child .asFile (), fsn .getBlockManager ());
455+ if (!parentDir .addChildAtLoading (child )) {
456+ return false ;
279457 }
458+ return true ;
280459 }
281460
282461 private INode loadINode (INodeSection .INode n ) {
@@ -527,6 +706,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
527706 final ArrayList <INodeReference > refList = parent .getSaverContext ()
528707 .getRefList ();
529708 int i = 0 ;
709+ int outputInodes = 0 ;
530710 while (iter .hasNext ()) {
531711 INodeWithAdditionalFields n = iter .next ();
532712 if (!n .isDirectory ()) {
@@ -558,6 +738,7 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
558738 refList .add (inode .asReference ());
559739 b .addRefChildren (refList .size () - 1 );
560740 }
741+ outputInodes ++;
561742 }
562743 INodeDirectorySection .DirEntry e = b .build ();
563744 e .writeDelimitedTo (out );
@@ -567,9 +748,15 @@ void serializeINodeDirectorySection(OutputStream out) throws IOException {
567748 if (i % FSImageFormatProtobuf .Saver .CHECK_CANCEL_INTERVAL == 0 ) {
568749 context .checkCancelled ();
569750 }
751+ if (outputInodes >= parent .getInodesPerSubSection ()) {
752+ outputInodes = 0 ;
753+ parent .commitSubSection (summary ,
754+ FSImageFormatProtobuf .SectionName .INODE_DIR_SUB );
755+ }
570756 }
571- parent .commitSection (summary ,
572- FSImageFormatProtobuf .SectionName .INODE_DIR );
757+ parent .commitSectionAndSubSection (summary ,
758+ FSImageFormatProtobuf .SectionName .INODE_DIR ,
759+ FSImageFormatProtobuf .SectionName .INODE_DIR_SUB );
573760 }
574761
575762 void serializeINodeSection (OutputStream out ) throws IOException {
@@ -589,8 +776,14 @@ void serializeINodeSection(OutputStream out) throws IOException {
589776 if (i % FSImageFormatProtobuf .Saver .CHECK_CANCEL_INTERVAL == 0 ) {
590777 context .checkCancelled ();
591778 }
779+ if (i % parent .getInodesPerSubSection () == 0 ) {
780+ parent .commitSubSection (summary ,
781+ FSImageFormatProtobuf .SectionName .INODE_SUB );
782+ }
592783 }
593- parent .commitSection (summary , FSImageFormatProtobuf .SectionName .INODE );
784+ parent .commitSectionAndSubSection (summary ,
785+ FSImageFormatProtobuf .SectionName .INODE ,
786+ FSImageFormatProtobuf .SectionName .INODE_SUB );
594787 }
595788
596789 void serializeFilesUCSection (OutputStream out ) throws IOException {
0 commit comments