7070import org .apache .hadoop .hbase .client .TableDescriptorBuilder ;
7171import org .apache .hadoop .hbase .util .Bytes ;
7272import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
73- import org .apache .hadoop .hbase .util .Pair ;
7473import org .apache .yetus .audience .InterfaceAudience ;
7574import org .slf4j .Logger ;
7675import org .slf4j .LoggerFactory ;
@@ -179,10 +178,6 @@ public String toString() {
179178 final static byte [] TBL_COL = Bytes .toBytes ("tbl" );
180179 final static byte [] FAM_COL = Bytes .toBytes ("fam" );
181180 final static byte [] PATH_COL = Bytes .toBytes ("path" );
182- final static byte [] STATE_COL = Bytes .toBytes ("state" );
183- // the two states a bulk loaded file can be
184- final static byte [] BL_PREPARE = Bytes .toBytes ("R" );
185- final static byte [] BL_COMMIT = Bytes .toBytes ("D" );
186181
187182 private final static String SET_KEY_PREFIX = "backupset:" ;
188183
@@ -378,7 +373,7 @@ public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<Table
378373 }
379374 files .add (new Path (path ));
380375 if (LOG .isDebugEnabled ()) {
381- LOG .debug ("found bulk loaded file : " + tbl + " " + Bytes .toString (fam ) + " " + path );
376+ LOG .debug ("found bulk loaded file : {} {} {}" , tbl , Bytes .toString (fam ), path );
382377 }
383378 }
384379
@@ -401,43 +396,22 @@ public void deleteBackupInfo(String backupId) throws IOException {
401396 }
402397 }
403398
404- /*
405- * For postBulkLoadHFile() hook .
406- * @param tabName table name
407- * @param region the region receiving hfile
408- * @param finalPaths family and associated hfiles
399+ /**
400+ * Registers a bulk load .
401+ * @param tableName table name
402+ * @param region the region receiving hfile
403+ * @param cfToHfilePath column family and associated hfiles
409404 */
410- public void writePathsPostBulkLoad (TableName tabName , byte [] region ,
411- Map <byte [], List <Path >> finalPaths ) throws IOException {
405+ public void registerBulkLoad (TableName tableName , byte [] region ,
406+ Map <byte [], List <Path >> cfToHfilePath ) throws IOException {
412407 if (LOG .isDebugEnabled ()) {
413- LOG .debug ("write bulk load descriptor to backup " + tabName + " with " + finalPaths . size ()
414- + " entries" );
408+ LOG .debug ("Writing bulk load descriptor to backup {} with {} entries" , tableName ,
409+ cfToHfilePath . size () );
415410 }
416411 try (BufferedMutator bufferedMutator = connection .getBufferedMutator (bulkLoadTableName )) {
417- List <Put > puts = BackupSystemTable .createPutForCommittedBulkload ( tabName , region , finalPaths );
412+ List <Put > puts = BackupSystemTable .createPutForBulkLoad ( tableName , region , cfToHfilePath );
418413 bufferedMutator .mutate (puts );
419- LOG .debug ("written " + puts .size () + " rows for bulk load of " + tabName );
420- }
421- }
422-
423- /*
424- * For preCommitStoreFile() hook
425- * @param tabName table name
426- * @param region the region receiving hfile
427- * @param family column family
428- * @param pairs list of paths for hfiles
429- */
430- public void writeFilesForBulkLoadPreCommit (TableName tabName , byte [] region , final byte [] family ,
431- final List <Pair <Path , Path >> pairs ) throws IOException {
432- if (LOG .isDebugEnabled ()) {
433- LOG .debug (
434- "write bulk load descriptor to backup " + tabName + " with " + pairs .size () + " entries" );
435- }
436- try (Table table = connection .getTable (bulkLoadTableName )) {
437- List <Put > puts =
438- BackupSystemTable .createPutForPreparedBulkload (tabName , region , family , pairs );
439- table .put (puts );
440- LOG .debug ("written " + puts .size () + " rows for bulk load of " + tabName );
414+ LOG .debug ("Written {} rows for bulk load of {}" , puts .size (), tableName );
441415 }
442416 }
443417
@@ -459,33 +433,25 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
459433 }
460434 }
461435
462- /*
436+ /**
463437 * Reads the rows from backup table recording bulk loaded hfiles
464438 * @param tableList list of table names
465- * @return The keys of the Map are table, region and column family. Value of the map reflects
466- * whether the hfile was recorded by preCommitStoreFile hook (true)
467- */
468- public Pair <Map <TableName , Map <String , Map <String , List <Pair <String , Boolean >>>>>, List <byte []>>
469- readBulkloadRows (List <TableName > tableList ) throws IOException {
470-
471- Map <TableName , Map <String , Map <String , List <Pair <String , Boolean >>>>> map = new HashMap <>();
472- List <byte []> rows = new ArrayList <>();
473- for (TableName tTable : tableList ) {
474- Scan scan = BackupSystemTable .createScanForOrigBulkLoadedFiles (tTable );
475- Map <String , Map <String , List <Pair <String , Boolean >>>> tblMap = map .get (tTable );
476- try (Table table = connection .getTable (bulkLoadTableName );
477- ResultScanner scanner = table .getScanner (scan )) {
478- Result res = null ;
439+ */
440+ public List <BulkLoad > readBulkloadRows (List <TableName > tableList ) throws IOException {
441+ List <BulkLoad > result = new ArrayList <>();
442+ for (TableName table : tableList ) {
443+ Scan scan = BackupSystemTable .createScanForOrigBulkLoadedFiles (table );
444+ try (Table bulkLoadTable = connection .getTable (bulkLoadTableName );
445+ ResultScanner scanner = bulkLoadTable .getScanner (scan )) {
446+ Result res ;
479447 while ((res = scanner .next ()) != null ) {
480448 res .advance ();
481449 String fam = null ;
482450 String path = null ;
483- boolean raw = false ;
484- byte [] row ;
485451 String region = null ;
452+ byte [] row = null ;
486453 for (Cell cell : res .listCells ()) {
487454 row = CellUtil .cloneRow (cell );
488- rows .add (row );
489455 String rowStr = Bytes .toString (row );
490456 region = BackupSystemTable .getRegionNameFromOrigBulkLoadRow (rowStr );
491457 if (
@@ -498,35 +464,14 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
498464 BackupSystemTable .PATH_COL .length ) == 0
499465 ) {
500466 path = Bytes .toString (CellUtil .cloneValue (cell ));
501- } else if (
502- CellUtil .compareQualifiers (cell , BackupSystemTable .STATE_COL , 0 ,
503- BackupSystemTable .STATE_COL .length ) == 0
504- ) {
505- byte [] state = CellUtil .cloneValue (cell );
506- if (Bytes .equals (BackupSystemTable .BL_PREPARE , state )) {
507- raw = true ;
508- } else {
509- raw = false ;
510- }
511467 }
512468 }
513- if (map .get (tTable ) == null ) {
514- map .put (tTable , new HashMap <>());
515- tblMap = map .get (tTable );
516- }
517- if (tblMap .get (region ) == null ) {
518- tblMap .put (region , new HashMap <>());
519- }
520- Map <String , List <Pair <String , Boolean >>> famMap = tblMap .get (region );
521- if (famMap .get (fam ) == null ) {
522- famMap .put (fam , new ArrayList <>());
523- }
524- famMap .get (fam ).add (new Pair <>(path , raw ));
469+ result .add (new BulkLoad (table , region , fam , path , row ));
525470 LOG .debug ("found orig " + path + " for " + fam + " of table " + region );
526471 }
527472 }
528473 }
529- return new Pair <>( map , rows ) ;
474+ return result ;
530475 }
531476
532477 /*
@@ -793,20 +738,19 @@ public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) th
793738 return result ;
794739 }
795740
796- /*
797- * Retrieve TableName's for completed backup of given type
798- * @param type backup type
799- * @return List of table names
741+ /**
742+ * Retrieve all table names that are part of any known backup
800743 */
801- public List <TableName > getTablesForBackupType ( BackupType type ) throws IOException {
744+ public Set <TableName > getTablesIncludedInBackups ( ) throws IOException {
802745 Set <TableName > names = new HashSet <>();
803746 List <BackupInfo > infos = getBackupHistory (true );
804747 for (BackupInfo info : infos ) {
805- if (info .getType () == type ) {
748+ // Incremental backups have the same tables as the preceding full backups
749+ if (info .getType () == BackupType .FULL ) {
806750 names .addAll (info .getTableNames ());
807751 }
808752 }
809- return new ArrayList <>( names ) ;
753+ return names ;
810754 }
811755
812756 /**
@@ -1500,13 +1444,13 @@ private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
15001444 return s .substring (index + 1 );
15011445 }
15021446
1503- /*
1504- * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1447+ /**
1448+ * Creates Put's for bulk loads.
15051449 */
1506- static List <Put > createPutForCommittedBulkload (TableName table , byte [] region ,
1507- Map <byte [], List <Path >> finalPaths ) {
1450+ private static List <Put > createPutForBulkLoad (TableName table , byte [] region ,
1451+ Map <byte [], List <Path >> columnFamilyToHFilePaths ) {
15081452 List <Put > puts = new ArrayList <>();
1509- for (Map .Entry <byte [], List <Path >> entry : finalPaths .entrySet ()) {
1453+ for (Map .Entry <byte [], List <Path >> entry : columnFamilyToHFilePaths .entrySet ()) {
15101454 for (Path path : entry .getValue ()) {
15111455 String file = path .toString ();
15121456 int lastSlash = file .lastIndexOf ("/" );
@@ -1516,10 +1460,8 @@ static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
15161460 put .addColumn (BackupSystemTable .META_FAMILY , TBL_COL , table .getName ());
15171461 put .addColumn (BackupSystemTable .META_FAMILY , FAM_COL , entry .getKey ());
15181462 put .addColumn (BackupSystemTable .META_FAMILY , PATH_COL , Bytes .toBytes (file ));
1519- put .addColumn (BackupSystemTable .META_FAMILY , STATE_COL , BL_COMMIT );
15201463 puts .add (put );
1521- LOG
1522- .debug ("writing done bulk path " + file + " for " + table + " " + Bytes .toString (region ));
1464+ LOG .debug ("Done writing bulk path {} for {} {}" , file , table , Bytes .toString (region ));
15231465 }
15241466 }
15251467 return puts ;
@@ -1580,29 +1522,6 @@ public static void deleteSnapshot(Connection conn) throws IOException {
15801522 }
15811523 }
15821524
1583- /*
1584- * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1585- */
1586- static List <Put > createPutForPreparedBulkload (TableName table , byte [] region , final byte [] family ,
1587- final List <Pair <Path , Path >> pairs ) {
1588- List <Put > puts = new ArrayList <>(pairs .size ());
1589- for (Pair <Path , Path > pair : pairs ) {
1590- Path path = pair .getSecond ();
1591- String file = path .toString ();
1592- int lastSlash = file .lastIndexOf ("/" );
1593- String filename = file .substring (lastSlash + 1 );
1594- Put put = new Put (rowkey (BULK_LOAD_PREFIX , table .toString (), BLK_LD_DELIM ,
1595- Bytes .toString (region ), BLK_LD_DELIM , filename ));
1596- put .addColumn (BackupSystemTable .META_FAMILY , TBL_COL , table .getName ());
1597- put .addColumn (BackupSystemTable .META_FAMILY , FAM_COL , family );
1598- put .addColumn (BackupSystemTable .META_FAMILY , PATH_COL , Bytes .toBytes (file ));
1599- put .addColumn (BackupSystemTable .META_FAMILY , STATE_COL , BL_PREPARE );
1600- puts .add (put );
1601- LOG .debug ("writing raw bulk path " + file + " for " + table + " " + Bytes .toString (region ));
1602- }
1603- return puts ;
1604- }
1605-
16061525 public static List <Delete > createDeleteForOrigBulkLoad (List <TableName > lst ) {
16071526 List <Delete > lstDels = new ArrayList <>(lst .size ());
16081527 for (TableName table : lst ) {
0 commit comments