3030import java .util .TreeMap ;
3131import org .apache .commons .lang3 .StringUtils ;
3232import org .apache .hadoop .fs .FileSystem ;
33+ import org .apache .hadoop .fs .LocatedFileStatus ;
3334import org .apache .hadoop .fs .Path ;
35+ import org .apache .hadoop .fs .RemoteIterator ;
3436import org .apache .hadoop .hbase .TableName ;
3537import org .apache .hadoop .hbase .backup .BackupCopyJob ;
3638import org .apache .hadoop .hbase .backup .BackupInfo ;
4042import org .apache .hadoop .hbase .backup .BackupType ;
4143import org .apache .hadoop .hbase .backup .HBackupFileSystem ;
4244import org .apache .hadoop .hbase .backup .mapreduce .MapReduceBackupCopyJob ;
45+ import org .apache .hadoop .hbase .backup .mapreduce .MapReduceHFileSplitterJob ;
4346import org .apache .hadoop .hbase .backup .util .BackupUtils ;
4447import org .apache .hadoop .hbase .client .Admin ;
4548import org .apache .hadoop .hbase .client .ColumnFamilyDescriptor ;
4649import org .apache .hadoop .hbase .client .Connection ;
50+ import org .apache .hadoop .hbase .io .hfile .HFile ;
4751import org .apache .hadoop .hbase .mapreduce .WALPlayer ;
4852import org .apache .hadoop .hbase .snapshot .SnapshotDescriptionUtils ;
4953import org .apache .hadoop .hbase .snapshot .SnapshotManifest ;
54+ import org .apache .hadoop .hbase .snapshot .SnapshotRegionLocator ;
5055import org .apache .hadoop .hbase .util .Bytes ;
5156import org .apache .hadoop .hbase .util .CommonFSUtils ;
5257import org .apache .hadoop .hbase .util .HFileArchiveUtil ;
@@ -120,8 +125,6 @@ protected static int getIndex(TableName tbl, List<TableName> sTableList) {
120125 @ SuppressWarnings ("unchecked" )
121126 protected List <byte []> handleBulkLoad (List <TableName > sTableList ) throws IOException {
122127 Map <byte [], List <Path >>[] mapForSrc = new Map [sTableList .size ()];
123- List <String > activeFiles = new ArrayList <>();
124- List <String > archiveFiles = new ArrayList <>();
125128 Pair <Map <TableName , Map <String , Map <String , List <Pair <String , Boolean >>>>>, List <byte []>> pair =
126129 backupManager .readBulkloadRows (sTableList );
127130 Map <TableName , Map <String , Map <String , List <Pair <String , Boolean >>>>> map = pair .getFirst ();
@@ -136,6 +139,8 @@ protected List<byte[]> handleBulkLoad(List<TableName> sTableList) throws IOExcep
136139
137140 for (Map .Entry <TableName , Map <String , Map <String , List <Pair <String , Boolean >>>>> tblEntry : map
138141 .entrySet ()) {
142+ List <String > activeFiles = new ArrayList <>();
143+ List <String > archiveFiles = new ArrayList <>();
139144 TableName srcTable = tblEntry .getKey ();
140145
141146 int srcIdx = getIndex (srcTable , sTableList );
@@ -197,55 +202,56 @@ protected List<byte[]> handleBulkLoad(List<TableName> sTableList) throws IOExcep
197202 }
198203 }
199204 }
205+ mergeSplitBulkloads (activeFiles , archiveFiles , srcTable );
206+ incrementalCopyBulkloadHFiles (tgtFs , srcTable );
200207 }
201-
202- copyBulkLoadedFiles (activeFiles , archiveFiles );
203-
204208 return pair .getSecond ();
205209 }
206210
207- private void copyBulkLoadedFiles (List <String > activeFiles , List <String > archiveFiles )
208- throws IOException {
209- try {
210- // Enable special mode of BackupDistCp
211- conf .setInt (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY , 5 );
212- // Copy active files
213- String tgtDest = backupInfo .getBackupRootDir () + Path .SEPARATOR + backupInfo .getBackupId ();
214- int attempt = 1 ;
215- while (activeFiles .size () > 0 ) {
216- LOG .info ("Copy " + activeFiles .size () + " active bulk loaded files. Attempt =" + attempt ++);
217- String [] toCopy = new String [activeFiles .size ()];
218- activeFiles .toArray (toCopy );
219- // Active file can be archived during copy operation,
220- // we need to handle this properly
221- try {
222- incrementalCopyHFiles (toCopy , tgtDest );
223- break ;
224- } catch (IOException e ) {
225- // Check if some files got archived
226- // Update active and archived lists
227- // When file is being moved from active to archive
228- // directory, the number of active files decreases
229- int numOfActive = activeFiles .size ();
230- updateFileLists (activeFiles , archiveFiles );
231- if (activeFiles .size () < numOfActive ) {
232- continue ;
233- }
234- // if not - throw exception
235- throw e ;
211+ private void mergeSplitBulkloads (List <String > activeFiles , List <String > archiveFiles ,
212+ TableName tn ) throws IOException {
213+ int attempt = 1 ;
214+
215+ while (!activeFiles .isEmpty ()) {
216+ LOG .info ("MergeSplit {} active bulk loaded files. Attempt={}" , activeFiles .size (), attempt ++);
217+ // Active file can be archived during copy operation,
218+ // we need to handle this properly
219+ try {
220+ mergeSplitBulkloads (activeFiles , tn );
221+ break ;
222+ } catch (IOException e ) {
223+ int numActiveFiles = activeFiles .size ();
224+ updateFileLists (activeFiles , archiveFiles );
225+ if (activeFiles .size () < numActiveFiles ) {
226+ continue ;
236227 }
228+
229+ throw e ;
237230 }
238- // If incremental copy will fail for archived files
239- // we will have partially loaded files in backup destination (only files from active data
240- // directory). It is OK, because the backup will marked as FAILED and data will be cleaned up
241- if (archiveFiles .size () > 0 ) {
242- String [] toCopy = new String [archiveFiles .size ()];
243- archiveFiles .toArray (toCopy );
244- incrementalCopyHFiles (toCopy , tgtDest );
231+ }
232+
233+ if (!archiveFiles .isEmpty ()) {
234+ mergeSplitBulkloads (archiveFiles , tn );
235+ }
236+ }
237+
238+ private void mergeSplitBulkloads (List <String > files , TableName tn ) throws IOException {
239+ MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob ();
240+ conf .set (MapReduceHFileSplitterJob .BULK_OUTPUT_CONF_KEY ,
241+ getBulkOutputDirForTable (tn ).toString ());
242+ player .setConf (conf );
243+
244+ String inputDirs = StringUtils .join (files , "," );
245+ String [] args = { inputDirs , tn .getNameAsString () };
246+
247+ try {
248+ int result = player .run (args );
249+ if (result != 0 ) {
250+ throw new RuntimeException ("Failed to run MapReduceHFileSplitterJob" );
245251 }
246- } finally {
247- // Disable special mode of BackupDistCp
248- conf . unset ( MapReduceBackupCopyJob . NUMBER_OF_LEVELS_TO_PRESERVE_KEY );
252+ } catch ( Exception e ) {
253+ LOG . error ( e . toString (), e );
254+ throw new IOException ( e );
249255 }
250256 }
251257
@@ -290,6 +296,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
290296 try {
291297 // copy out the table and region info files for each table
292298 BackupUtils .copyTableRegionInfo (conn , backupInfo , conf );
299+ setupRegionLocator ();
293300 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
294301 convertWALsToHFiles ();
295302 incrementalCopyHFiles (new String [] { getBulkOutputDir ().toString () },
@@ -337,6 +344,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
337344 try {
338345 LOG .debug ("Incremental copy HFiles is starting. dest=" + backupDest );
339346 // set overall backup phase: incremental_copy
347+ // TODO - This should now happen elsewhere maybe
340348 backupInfo .setPhase (BackupPhase .INCREMENTAL_COPY );
341349 // get incremental backup file list and prepare parms for DistCp
342350 String [] strArr = new String [files .length + 1 ];
@@ -431,6 +439,29 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
431439 }
432440 }
433441
442+ private void incrementalCopyBulkloadHFiles (FileSystem tgtFs , TableName tn ) throws IOException {
443+ Path bulkOutDir = getBulkOutputDirForTable (tn );
444+ FileSystem fs = FileSystem .get (conf );
445+
446+ if (fs .exists (bulkOutDir )) {
447+ conf .setInt (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY , 2 );
448+ Path tgtPath = getTargetDirForTable (tn );
449+ try {
450+ RemoteIterator <LocatedFileStatus > locatedFiles = tgtFs .listFiles (bulkOutDir , true );
451+ List <String > files = new ArrayList <>();
452+ while (locatedFiles .hasNext ()) {
453+ LocatedFileStatus file = locatedFiles .next ();
454+ if (file .isFile () && HFile .isHFileFormat (tgtFs , file .getPath ())) {
455+ files .add (file .getPath ().toString ());
456+ }
457+ }
458+ incrementalCopyHFiles (files .toArray (files .toArray (new String [0 ])), tgtPath .toString ());
459+ } finally {
460+ conf .unset (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY );
461+ }
462+ }
463+ }
464+
434465 protected Path getBulkOutputDirForTable (TableName table ) {
435466 Path tablePath = getBulkOutputDir ();
436467 tablePath = new Path (tablePath , table .getNamespaceAsString ());
@@ -446,6 +477,30 @@ protected Path getBulkOutputDir() {
446477 return path ;
447478 }
448479
480+ private Path getTargetDirForTable (TableName table ) {
481+ Path path = new Path (backupInfo .getBackupRootDir () + Path .SEPARATOR + backupInfo .getBackupId ());
482+ path = new Path (path , table .getNamespaceAsString ());
483+ path = new Path (path , table .getNameAsString ());
484+ return path ;
485+ }
486+
487+ private void setupRegionLocator () throws IOException {
488+ Map <TableName , String > fullBackupIds = getFullBackupIds ();
489+ try (BackupAdminImpl backupAdmin = new BackupAdminImpl (conn )) {
490+
491+ for (TableName tableName : backupInfo .getTables ()) {
492+ String fullBackupId = fullBackupIds .get (tableName );
493+ BackupInfo fullBackupInfo = backupAdmin .getBackupInfo (fullBackupId );
494+ String snapshotName = fullBackupInfo .getSnapshotName (tableName );
495+ Path root = HBackupFileSystem .getTableBackupPath (tableName ,
496+ new Path (fullBackupInfo .getBackupRootDir ()), fullBackupId );
497+ String manifestDir =
498+ SnapshotDescriptionUtils .getCompletedSnapshotDir (snapshotName , root ).toString ();
499+ SnapshotRegionLocator .setSnapshotManifestDir (conf , manifestDir , tableName );
500+ }
501+ }
502+ }
503+
449504 private Map <TableName , String > getFullBackupIds () throws IOException {
450505 // Ancestors are stored from newest to oldest, so we can iterate backwards
451506 // in order to populate our backupId map with the most recent full backup
0 commit comments