1919
2020import  static  org .apache .hadoop .util .Time .monotonicNow ;
2121
22+ import  com .google .common .collect .Lists ;
2223import  java .io .IOException ;
2324import  java .net .URI ;
2425import  java .net .URL ;
2526import  java .security .PrivilegedAction ;
2627import  java .util .ArrayList ;
28+ import  java .util .HashMap ;
2729import  java .util .List ;
30+ import  java .util .Map ;
2831import  java .util .concurrent .*;
29- import  java .util .concurrent .atomic .AtomicInteger ;
3032
3133import  org .apache .hadoop .classification .InterfaceAudience ;
3234import  org .apache .hadoop .conf .Configuration ;
@@ -67,19 +69,20 @@ public class StandbyCheckpointer {
6769  private  final  Configuration  conf ;
6870  private  final  FSNamesystem  namesystem ;
6971  private  long  lastCheckpointTime ;
70-   private  long  lastUploadTime ;
7172  private  final  CheckpointerThread  thread ;
7273  private  final  ThreadFactory  uploadThreadFactory ;
7374  private  List <URL > activeNNAddresses ;
7475  private  URL  myNNAddress ;
7576
7677  private  final  Object  cancelLock  = new  Object ();
7778  private  Canceler  canceler ;
78-   private  boolean  isPrimaryCheckPointer  = true ;
7979
8080  // Keep track of how many checkpoints were canceled. 
8181  // This is for use in tests. 
8282  private  static  int  canceledCount  = 0 ;
83+ 
84+   // A map from NN url to the most recent image upload time. 
85+   private  final  HashMap <String , CheckpointReceiverEntry > checkpointReceivers ;
8386
8487  public  StandbyCheckpointer (Configuration  conf , FSNamesystem  ns )
8588      throws  IOException  {
@@ -89,8 +92,38 @@ public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
8992    this .thread  = new  CheckpointerThread ();
9093    this .uploadThreadFactory  = new  ThreadFactoryBuilder ().setDaemon (true )
9194        .setNameFormat ("TransferFsImageUpload-%d" ).build ();
92- 
9395    setNameNodeAddresses (conf );
96+     this .checkpointReceivers  = new  HashMap <>();
97+     for  (URL  address  : activeNNAddresses ) {
98+       this .checkpointReceivers .put (address .toString (),
99+           new  CheckpointReceiverEntry ());
100+     }
101+   }
102+ 
103+   private  static  final  class  CheckpointReceiverEntry  {
104+     private  long  lastUploadTime ;
105+     private  boolean  isPrimary ;
106+ 
107+     CheckpointReceiverEntry () {
108+       this .lastUploadTime  = 0L ;
109+       this .isPrimary  = true ;
110+     }
111+ 
112+     void  setLastUploadTime (long  lastUploadTime ) {
113+       this .lastUploadTime  = lastUploadTime ;
114+     }
115+ 
116+     void  setIsPrimary (boolean  isPrimaryFor ) {
117+       this .isPrimary  = isPrimaryFor ;
118+     }
119+ 
120+     long  getLastUploadTime () {
121+       return  lastUploadTime ;
122+     }
123+ 
124+     boolean  isPrimary () {
125+       return  isPrimary ;
126+     }
94127  }
95128
96129  /** 
@@ -158,7 +191,7 @@ public void triggerRollbackCheckpoint() {
158191    thread .interrupt ();
159192  }
160193
161-   private  void  doCheckpoint (boolean   sendCheckpoint ) throws  InterruptedException , IOException  {
194+   private  void  doCheckpoint () throws  InterruptedException , IOException  {
162195    assert  canceler  != null ;
163196    final  long  txid ;
164197    final  NameNodeFile  imageType ;
@@ -210,11 +243,6 @@ private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, I
210243      namesystem .cpUnlock ();
211244    }
212245
213-     //early exit if we shouldn't actually send the checkpoint to the ANN 
214-     if (!sendCheckpoint ){
215-       return ;
216-     }
217- 
218246    // Upload the saved checkpoint back to the active 
219247    // Do this in a separate thread to avoid blocking transition to active, but don't allow more 
220248    // than the expected number of tasks to run or queue up 
@@ -224,56 +252,70 @@ private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, I
224252        uploadThreadFactory );
225253    // for right now, just match the upload to the nn address by convention. There is no need to 
226254    // directly tie them together by adding a pair class. 
227-     List < Future <TransferFsImage .TransferResult >> uploads  =
228-         new  ArrayList < Future < TransferFsImage . TransferResult > >();
255+     HashMap < String ,  Future <TransferFsImage .TransferResult >> uploads  =
256+         new  HashMap < >();
229257    for  (final  URL  activeNNAddress  : activeNNAddresses ) {
230-       Future <TransferFsImage .TransferResult > upload  =
231-           executor .submit (new  Callable <TransferFsImage .TransferResult >() {
232-             @ Override 
233-             public  TransferFsImage .TransferResult  call ()
234-                 throws  IOException , InterruptedException  {
235-               CheckpointFaultInjector .getInstance ().duringUploadInProgess ();
236-               return  TransferFsImage .uploadImageFromStorage (activeNNAddress , conf , namesystem 
237-                   .getFSImage ().getStorage (), imageType , txid , canceler );
238-             }
239-           });
240-       uploads .add (upload );
258+       // Upload image if at least 1 of 2 following conditions met: 
259+       // 1. has been quiet for long enough, try to contact the node. 
260+       // 2. this standby IS the primary checkpointer of target NN. 
261+       String  addressString  = activeNNAddress .toString ();
262+       assert  checkpointReceivers .containsKey (addressString );
263+       CheckpointReceiverEntry  receiverEntry  =
264+           checkpointReceivers .get (addressString );
265+       long  secsSinceLastUpload  =
266+           TimeUnit .MILLISECONDS .toSeconds (
267+               monotonicNow () - receiverEntry .getLastUploadTime ());
268+       boolean  shouldUpload  = receiverEntry .isPrimary () ||
269+           secsSinceLastUpload  >= checkpointConf .getQuietPeriod ();
270+       if  (shouldUpload ) {
271+         Future <TransferFsImage .TransferResult > upload  =
272+             executor .submit (new  Callable <TransferFsImage .TransferResult >() {
273+               @ Override 
274+               public  TransferFsImage .TransferResult  call ()
275+                   throws  IOException , InterruptedException  {
276+                 CheckpointFaultInjector .getInstance ().duringUploadInProgess ();
277+                 return  TransferFsImage .uploadImageFromStorage (activeNNAddress ,
278+                     conf , namesystem .getFSImage ().getStorage (), imageType , txid ,
279+                     canceler );
280+               }
281+             });
282+         uploads .put (addressString , upload );
283+       }
241284    }
242285    InterruptedException  ie  = null ;
243-     IOException   ioe =  null ;
244-     int   i  =  0 ; 
245-     boolean   success  =  false ; 
246-     for  (;  i  <  uploads . size ();  i ++) { 
247-       Future <TransferFsImage .TransferResult > upload  = uploads . get ( i );
286+     List < IOException >  ioes  =  Lists . newArrayList () ;
287+     for  ( Map . Entry < String ,  Future < TransferFsImage . TransferResult >>  entry  : 
288+          uploads . entrySet ()) { 
289+        String   url  =  entry . getKey ();
290+       Future <TransferFsImage .TransferResult > upload  = entry . getValue ( );
248291      try  {
249-         // TODO should there be some smarts here about retries nodes that are not the active NN? 
292+         // TODO should there be some smarts here about retries nodes that 
293+         //  are not the active NN? 
294+         CheckpointReceiverEntry  receiverEntry  = checkpointReceivers .get (url );
250295        if  (upload .get () == TransferFsImage .TransferResult .SUCCESS ) {
251-           success  = true ;
252-           //avoid getting the rest of the results - we don't care since we had a successful upload 
253-           break ;
296+           receiverEntry .setLastUploadTime (monotonicNow ());
297+           receiverEntry .setIsPrimary (true );
298+         } else  {
299+           receiverEntry .setIsPrimary (false );
254300        }
255- 
256301      } catch  (ExecutionException  e ) {
257-         ioe  = new  IOException ("Exception during image upload" , e );
258-         break ;
302+         // Even if exception happens, still proceeds to next NN url. 
303+         // so that fail to upload to previous NN does not cause the 
304+         // remaining NN not getting the fsImage. 
305+         ioes .add (new  IOException ("Exception during image upload" , e ));
259306      } catch  (InterruptedException  e ) {
260307        ie  = e ;
261308        break ;
262309      }
263310    }
264-     if  (ie  == null  && ioe  == null ) {
265-       //Update only when response from remote about success or 
266-       lastUploadTime  = monotonicNow ();
267-       // we are primary if we successfully updated the ANN 
268-       this .isPrimaryCheckPointer  = success ;
269-     }
270311    // cleaner than copying code for multiple catch statements and better than catching all 
271312    // exceptions, so we just handle the ones we expect. 
272-     if  (ie  != null  ||  ioe  !=  null ) {
313+     if  (ie  != null ) {
273314
274315      // cancel the rest of the tasks, and close the pool 
275-       for  (; i  < uploads .size (); i ++) {
276-         Future <TransferFsImage .TransferResult > upload  = uploads .get (i );
316+       for  (Map .Entry <String , Future <TransferFsImage .TransferResult >> entry  :
317+           uploads .entrySet ()) {
318+         Future <TransferFsImage .TransferResult > upload  = entry .getValue ();
277319        // The background thread may be blocked waiting in the throttler, so 
278320        // interrupt it. 
279321        upload .cancel (true );
@@ -286,11 +328,11 @@ public TransferFsImage.TransferResult call()
286328      executor .awaitTermination (500 , TimeUnit .MILLISECONDS );
287329
288330      // re-throw the exception we got, since one of these two must be non-null 
289-       if  ( ie  !=  null ) { 
290-          throw   ie ; 
291-       }  else   if  ( ioe  !=  null ) { 
292-          throw   ioe ; 
293-       } 
331+       throw   ie ; 
332+     } 
333+ 
334+     if  (! ioes . isEmpty ()) { 
335+       throw   MultipleIOException . createIOException ( ioes ); 
294336    }
295337  }
296338
@@ -373,7 +415,6 @@ private void doWork() {
373415      // Reset checkpoint time so that we don't always checkpoint 
374416      // on startup. 
375417      lastCheckpointTime  = monotonicNow ();
376-       lastUploadTime  = monotonicNow ();
377418      while  (shouldRun ) {
378419        boolean  needRollbackCheckpoint  = namesystem .isNeedRollbackFsImage ();
379420        if  (!needRollbackCheckpoint ) {
@@ -426,10 +467,7 @@ private void doWork() {
426467
427468            // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a 
428469            // rollback request, are the checkpointer, are outside the quiet period. 
429-             final  long  secsSinceLastUpload  = (now  - lastUploadTime ) / 1000 ;
430-             boolean  sendRequest  = isPrimaryCheckPointer 
431-                 || secsSinceLastUpload  >= checkpointConf .getQuietPeriod ();
432-             doCheckpoint (sendRequest );
470+             doCheckpoint ();
433471
434472            // reset needRollbackCheckpoint to false only when we finish a ckpt 
435473            // for rollback image 
0 commit comments