2323import  java .util .HashMap ;
2424import  java .util .List ;
2525import  java .util .Map ;
26+ import  java .util .stream .Collectors ;
2627import  org .apache .hadoop .conf .Configuration ;
2728import  org .apache .hadoop .fs .FileStatus ;
2829import  org .apache .hadoop .fs .Path ;
@@ -80,9 +81,16 @@ public void init(Map<String, Object> params) {
8081    }
8182  }
8283
83-   private  Map <Address , Long > getServersToOldestBackupMapping (List <BackupInfo > backups )
84+   private  Map <Address , Long > getServerToNewestBackupTs (List <BackupInfo > backups )
8485    throws  IOException  {
85-     Map <Address , Long > serverAddressToLastBackupMap  = new  HashMap <>();
86+     if  (LOG .isDebugEnabled ()) {
87+       LOG .debug (
88+         "Cleaning WALs if they are older than the newest backups. " 
89+           + "Checking WALs against {} backups: {}" ,
90+         backups .size (),
91+         backups .stream ().map (BackupInfo ::getBackupId ).sorted ().collect (Collectors .joining (", " )));
92+     }
93+     Map <Address , Long > serverAddressToNewestBackupMap  = new  HashMap <>();
8694
8795    Map <TableName , Long > tableNameBackupInfoMap  = new  HashMap <>();
8896    for  (BackupInfo  backupInfo  : backups ) {
@@ -92,13 +100,20 @@ private Map<Address, Long> getServersToOldestBackupMapping(List<BackupInfo> back
92100          tableNameBackupInfoMap .put (table , backupInfo .getStartTs ());
93101          for  (Map .Entry <String , Long > entry  : backupInfo .getTableSetTimestampMap ().get (table )
94102            .entrySet ()) {
95-             serverAddressToLastBackupMap .put (Address .fromString (entry .getKey ()), entry .getValue ());
103+             serverAddressToNewestBackupMap .put (Address .fromString (entry .getKey ()),
104+               entry .getValue ());
96105          }
97106        }
98107      }
99108    }
100109
101-     return  serverAddressToLastBackupMap ;
110+     if  (LOG .isDebugEnabled ()) {
111+       for  (Map .Entry <Address , Long > entry  : serverAddressToNewestBackupMap .entrySet ()) {
112+         LOG .debug ("Server: {}, Newest Backup: {}" , entry .getKey ().getHostName (), entry .getValue ());
113+       }
114+     }
115+ 
116+     return  serverAddressToNewestBackupMap ;
102117  }
103118
104119  @ Override 
@@ -113,19 +128,18 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
113128      return  files ;
114129    }
115130
116-     Map <Address , Long > addressToLastBackupMap ;
131+     Map <Address , Long > addressToNewestBackupMap ;
117132    try  {
118133      try  (BackupManager  backupManager  = new  BackupManager (conn , getConf ())) {
119-         addressToLastBackupMap  =
120-           getServersToOldestBackupMapping (backupManager .getBackupHistory (true ));
134+         addressToNewestBackupMap  = getServerToNewestBackupTs (backupManager .getBackupHistory (true ));
121135      }
122136    } catch  (IOException  ex ) {
123137      LOG .error ("Failed to analyse backup history with exception: {}. Retaining all logs" ,
124138        ex .getMessage (), ex );
125139      return  Collections .emptyList ();
126140    }
127141    for  (FileStatus  file  : files ) {
128-       if  (canDeleteFile (addressToLastBackupMap , file .getPath ())) {
142+       if  (canDeleteFile (addressToNewestBackupMap , file .getPath ())) {
129143        filteredFiles .add (file );
130144      }
131145    }
@@ -160,7 +174,7 @@ public boolean isStopped() {
160174    return  this .stopped ;
161175  }
162176
163-   protected  static  boolean  canDeleteFile (Map <Address , Long > addressToLastBackupMap , Path  path ) {
177+   protected  static  boolean  canDeleteFile (Map <Address , Long > addressToNewestBackupMap , Path  path ) {
164178    if  (isHMasterWAL (path )) {
165179      return  true ;
166180    }
@@ -176,12 +190,29 @@ protected static boolean canDeleteFile(Map<Address, Long> addressToLastBackupMap
176190      Address  walServerAddress  = Address .fromString (hostname );
177191      long  walTimestamp  = AbstractFSWALProvider .getTimestamp (path .getName ());
178192
179-       if  (
180-         !addressToLastBackupMap .containsKey (walServerAddress )
181-           || addressToLastBackupMap .get (walServerAddress ) >= walTimestamp 
182-       ) {
193+       if  (!addressToNewestBackupMap .containsKey (walServerAddress )) {
194+         if  (LOG .isDebugEnabled ()) {
195+           LOG .debug ("No backup found for server: {}. Deleting file: {}" ,
196+             walServerAddress .getHostName (), path );
197+         }
183198        return  true ;
184199      }
200+ 
201+       Long  lastBackupTs  = addressToNewestBackupMap .get (walServerAddress );
202+       if  (lastBackupTs  >= walTimestamp ) {
203+         if  (LOG .isDebugEnabled ()) {
204+           LOG .debug (
205+             "Backup found for server: {}. Backup from {} is newer than file, so deleting: {}" ,
206+             walServerAddress .getHostName (), lastBackupTs , path );
207+         }
208+         return  true ;
209+       }
210+ 
211+       if  (LOG .isDebugEnabled ()) {
212+         LOG .debug (
213+           "Backup found for server: {}. Backup from {} is older than the file, so keeping: {}" ,
214+           walServerAddress .getHostName (), lastBackupTs , path );
215+       }
185216    } catch  (Exception  ex ) {
186217      LOG .warn ("Error occurred while filtering file: {}. Ignoring cleanup of this log" , path , ex );
187218      return  false ;
0 commit comments