|
32 | 32 | import org.apache.hadoop.fs.Path; |
33 | 33 | import org.apache.hadoop.hbase.TableName; |
34 | 34 | import org.apache.hadoop.hbase.backup.BackupCopyJob; |
| 35 | +import org.apache.hadoop.hbase.backup.BackupInfo; |
35 | 36 | import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; |
36 | 37 | import org.apache.hadoop.hbase.backup.BackupRequest; |
37 | 38 | import org.apache.hadoop.hbase.backup.BackupRestoreFactory; |
38 | 39 | import org.apache.hadoop.hbase.backup.BackupType; |
| 40 | +import org.apache.hadoop.hbase.backup.HBackupFileSystem; |
39 | 41 | import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; |
40 | 42 | import org.apache.hadoop.hbase.backup.util.BackupUtils; |
41 | 43 | import org.apache.hadoop.hbase.client.Admin; |
| 44 | +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; |
42 | 45 | import org.apache.hadoop.hbase.client.Connection; |
43 | 46 | import org.apache.hadoop.hbase.mapreduce.WALPlayer; |
| 47 | +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; |
| 48 | +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; |
44 | 49 | import org.apache.hadoop.hbase.util.Bytes; |
45 | 50 | import org.apache.hadoop.hbase.util.CommonFSUtils; |
46 | 51 | import org.apache.hadoop.hbase.util.HFileArchiveUtil; |
|
51 | 56 | import org.slf4j.Logger; |
52 | 57 | import org.slf4j.LoggerFactory; |
53 | 58 |
|
| 59 | +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; |
| 60 | + |
54 | 61 | /** |
55 | 62 | * Incremental backup implementation. See the {@link #execute() execute} method. |
56 | 63 | */ |
@@ -262,6 +269,20 @@ private void updateFileLists(List<String> activeFiles, List<String> archiveFiles |
262 | 269 | @Override |
263 | 270 | public void execute() throws IOException { |
264 | 271 | try { |
| 272 | + String fullBackupId = |
| 273 | + getAncestors(backupInfo).stream().filter(bi -> bi.getType() == BackupType.FULL).findFirst() |
| 274 | + .orElseThrow(() -> new RuntimeException( |
| 275 | + "Could not find full backup for incremental backup: " + backupInfo.getBackupId())) |
| 276 | + .getBackupId(); |
| 277 | + |
| 278 | + try (BackupAdminImpl backupAdmin = new BackupAdminImpl(conn)) { |
| 279 | + BackupInfo fullBackupInfo = backupAdmin.getBackupInfo(fullBackupId); |
| 280 | + |
| 281 | + for (TableName tn : backupInfo.getTables()) { |
| 282 | + verifyHtd(tn, fullBackupInfo); |
| 283 | + } |
| 284 | + } |
| 285 | + |
265 | 286 | // case PREPARE_INCREMENTAL: |
266 | 287 | beginBackup(backupManager, backupInfo); |
267 | 288 | backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); |
@@ -434,4 +455,38 @@ protected Path getBulkOutputDir() { |
434 | 455 | path = new Path(path, backupId); |
435 | 456 | return path; |
436 | 457 | } |
| 458 | + |
| 459 | + private void verifyHtd(TableName tn, BackupInfo fullBackupInfo) throws IOException { |
| 460 | + try (Admin admin = conn.getAdmin()) { |
| 461 | + ColumnFamilyDescriptor[] currentCfs = admin.getDescriptor(tn).getColumnFamilies(); |
| 462 | + String snapshotName = fullBackupInfo.getSnapshotName(tn); |
| 463 | + Path root = HBackupFileSystem.getTableBackupPath(tn, |
| 464 | + new Path(fullBackupInfo.getBackupRootDir()), fullBackupInfo.getBackupId()); |
| 465 | + Path manifestDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, root); |
| 466 | + |
| 467 | + FileSystem fs = FileSystem.get(conf); |
| 468 | + SnapshotProtos.SnapshotDescription snapshotDescription = |
| 469 | + SnapshotDescriptionUtils.readSnapshotInfo(fs, manifestDir); |
| 470 | + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, manifestDir, snapshotDescription); |
| 471 | + |
| 472 | + ColumnFamilyDescriptor[] backupCfs = manifest.getTableDescriptor().getColumnFamilies(); |
| 473 | + verifyCfs(currentCfs, backupCfs); |
| 474 | + } |
| 475 | + } |
| 476 | + |
| 477 | + private static void verifyCfs(ColumnFamilyDescriptor[] currentCfs, |
| 478 | + ColumnFamilyDescriptor[] backupCfs) throws IOException { |
| 479 | + if (currentCfs.length != backupCfs.length) { |
| 480 | + throw ColumnFamilyMismatchException.create(currentCfs, backupCfs); |
| 481 | + } |
| 482 | + |
| 483 | + for (int i = 0; i < backupCfs.length; i++) { |
| 484 | + String currentCf = currentCfs[i].getNameAsString(); |
| 485 | + String backupCf = backupCfs[i].getNameAsString(); |
| 486 | + |
| 487 | + if (!currentCf.equals(backupCf)) { |
| 488 | + throw ColumnFamilyMismatchException.create(currentCfs, backupCfs); |
| 489 | + } |
| 490 | + } |
| 491 | + } |
437 | 492 | } |
0 commit comments