Skip to content

Commit 4e73e8a

Browse files
committed
Merge branch 'master' into HBASE-29368-key-management-feature
2 parents bad5c8f + e575525 commit 4e73e8a

File tree

113 files changed

+7345
-5410
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

113 files changed

+7345
-5410
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,13 @@ public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abor
5252
private boolean stopped = false;
5353
private boolean aborted = false;
5454
private Connection connection;
55-
// timestamp of most recent read from backup system table
56-
private long prevReadFromBackupTbl = 0;
57-
// timestamp of 2nd most recent read from backup system table
58-
private long secondPrevReadFromBackupTbl = 0;
55+
// timestamp of most recent completed cleaning run
56+
private volatile long previousCleaningCompletionTimestamp = 0;
57+
58+
@Override
59+
public void postClean() {
60+
previousCleaningCompletionTimestamp = EnvironmentEdgeManager.currentTime();
61+
}
5962

6063
@Override
6164
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
@@ -79,12 +82,12 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
7982
return Collections.emptyList();
8083
}
8184

82-
secondPrevReadFromBackupTbl = prevReadFromBackupTbl;
83-
prevReadFromBackupTbl = EnvironmentEdgeManager.currentTime();
85+
// Pin the threshold, we don't want the result to change depending on evaluation time.
86+
final long recentFileThreshold = previousCleaningCompletionTimestamp;
8487

8588
return Iterables.filter(files, file -> {
8689
// If the file is recent, be conservative and wait for one more scan of the bulk loads
87-
if (file.getModificationTime() > secondPrevReadFromBackupTbl) {
90+
if (file.getModificationTime() > recentFileThreshold) {
8891
LOG.debug("Preventing deletion due to timestamp: {}", file.getPath().toString());
8992
return false;
9093
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
4949
import org.apache.hadoop.hbase.client.Connection;
5050
import org.apache.hadoop.hbase.io.hfile.HFile;
51+
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
5152
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
5253
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
5354
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@@ -199,6 +200,9 @@ private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
199200
int numActiveFiles = activeFiles.size();
200201
updateFileLists(activeFiles, archiveFiles);
201202
if (activeFiles.size() < numActiveFiles) {
203+
// We've archived some files, delete bulkloads directory
204+
// and re-try
205+
deleteBulkLoadDirectory();
202206
continue;
203207
}
204208

@@ -241,7 +245,7 @@ private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn,
241245
incrementalCopyBulkloadHFiles(tgtFs, tn);
242246
}
243247

244-
private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
248+
public void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
245249
throws IOException {
246250
List<String> newlyArchived = new ArrayList<>();
247251

@@ -251,9 +255,23 @@ private void updateFileLists(List<String> activeFiles, List<String> archiveFiles
251255
}
252256
}
253257

254-
if (newlyArchived.size() > 0) {
258+
if (!newlyArchived.isEmpty()) {
259+
String rootDir = CommonFSUtils.getRootDir(conf).toString();
260+
255261
activeFiles.removeAll(newlyArchived);
256-
archiveFiles.addAll(newlyArchived);
262+
for (String file : newlyArchived) {
263+
String archivedFile = file.substring(rootDir.length() + 1);
264+
Path archivedFilePath = new Path(HFileArchiveUtil.getArchivePath(conf), archivedFile);
265+
archivedFile = archivedFilePath.toString();
266+
267+
if (!fs.exists(archivedFilePath)) {
268+
throw new IOException(String.format(
269+
"File %s no longer exists, and no archived file %s exists for it", file, archivedFile));
270+
}
271+
272+
LOG.debug("Archived file {} has been updated", archivedFile);
273+
archiveFiles.add(archivedFile);
274+
}
257275
}
258276

259277
LOG.debug(newlyArchived.size() + " files have been archived.");
@@ -334,6 +352,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
334352
}
335353

336354
protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
355+
boolean diskBasedSortingOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf);
337356
try {
338357
LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
339358
// set overall backup phase: incremental_copy
@@ -348,6 +367,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
348367
LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
349368
}
350369
conf.set(JOB_NAME_CONF_KEY, jobname);
370+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
351371

352372
BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
353373
int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
@@ -360,6 +380,8 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
360380
+ " finished.");
361381
} finally {
362382
deleteBulkLoadDirectory();
383+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
384+
diskBasedSortingOriginalValue);
363385
}
364386
}
365387

@@ -413,6 +435,9 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
413435
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
414436
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
415437
conf.set(JOB_NAME_CONF_KEY, jobname);
438+
439+
boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf);
440+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
416441
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
417442

418443
try {
@@ -421,13 +446,16 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
421446
if (result != 0) {
422447
throw new IOException("WAL Player failed");
423448
}
424-
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
425-
conf.unset(JOB_NAME_CONF_KEY);
426449
} catch (IOException e) {
427450
throw e;
428451
} catch (Exception ee) {
429452
throw new IOException("Can not convert from directory " + dirs
430453
+ " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
454+
} finally {
455+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
456+
diskBasedSortingEnabledOriginalValue);
457+
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
458+
conf.unset(JOB_NAME_CONF_KEY);
431459
}
432460
}
433461

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.hbase.Cell;
2525
import org.apache.hadoop.hbase.CellUtil;
26+
import org.apache.hadoop.hbase.ExtendedCell;
2627
import org.apache.hadoop.hbase.HBaseConfiguration;
2728
import org.apache.hadoop.hbase.PrivateCellUtil;
2829
import org.apache.hadoop.hbase.TableName;
@@ -34,11 +35,14 @@
3435
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
3536
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
3637
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
38+
import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable;
39+
import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer;
3740
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
3841
import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
3942
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4043
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
4144
import org.apache.hadoop.io.NullWritable;
45+
import org.apache.hadoop.io.WritableComparable;
4246
import org.apache.hadoop.mapreduce.Job;
4347
import org.apache.hadoop.mapreduce.Mapper;
4448
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -72,18 +76,28 @@ protected MapReduceHFileSplitterJob(final Configuration c) {
7276
/**
7377
* A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
7478
*/
75-
static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
79+
static class HFileCellMapper extends Mapper<NullWritable, Cell, WritableComparable<?>, Cell> {
80+
81+
private boolean diskBasedSortingEnabled = false;
7682

7783
@Override
7884
public void map(NullWritable key, Cell value, Context context)
7985
throws IOException, InterruptedException {
80-
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
81-
new MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(value)));
86+
ExtendedCell extendedCell = PrivateCellUtil.ensureExtendedCell(value);
87+
context.write(wrap(extendedCell), new MapReduceExtendedCell(extendedCell));
8288
}
8389

8490
@Override
8591
public void setup(Context context) throws IOException {
86-
// do nothing
92+
diskBasedSortingEnabled =
93+
HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration());
94+
}
95+
96+
private WritableComparable<?> wrap(ExtendedCell cell) {
97+
if (diskBasedSortingEnabled) {
98+
return new KeyOnlyCellComparable(cell);
99+
}
100+
return new ImmutableBytesWritable(CellUtil.cloneRow(cell));
87101
}
88102
}
89103

@@ -107,13 +121,23 @@ public Job createSubmittableJob(String[] args) throws IOException {
107121
true);
108122
job.setJarByClass(MapReduceHFileSplitterJob.class);
109123
job.setInputFormatClass(HFileInputFormat.class);
110-
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
111124
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
125+
boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf);
126+
if (diskBasedSortingEnabled) {
127+
job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
128+
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
129+
} else {
130+
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
131+
}
112132
if (hfileOutPath != null) {
113133
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
114134
TableName tableName = TableName.valueOf(tabName);
115135
job.setMapperClass(HFileCellMapper.class);
116-
job.setReducerClass(CellSortReducer.class);
136+
if (diskBasedSortingEnabled) {
137+
job.setReducerClass(PreSortedCellsReducer.class);
138+
} else {
139+
job.setReducerClass(CellSortReducer.class);
140+
}
117141
Path outputDir = new Path(hfileOutPath);
118142
FileOutputFormat.setOutputPath(job, outputDir);
119143
job.setMapOutputValueClass(MapReduceExtendedCell.class);

hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupHFileCleaner.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ protected Set<TableName> fetchFullyBackedUpTables(BackupSystemTable tbl) {
108108
Iterable<FileStatus> deletable;
109109

110110
// The first call will not allow any deletions because of the timestamp mechanism.
111-
deletable = cleaner.getDeletableFiles(List.of(file1, file1Archived, file2, file3));
111+
deletable = callCleaner(cleaner, List.of(file1, file1Archived, file2, file3));
112112
assertEquals(Set.of(), Sets.newHashSet(deletable));
113113

114114
// No bulk loads registered, so all files can be deleted.
115-
deletable = cleaner.getDeletableFiles(List.of(file1, file1Archived, file2, file3));
115+
deletable = callCleaner(cleaner, List.of(file1, file1Archived, file2, file3));
116116
assertEquals(Set.of(file1, file1Archived, file2, file3), Sets.newHashSet(deletable));
117117

118118
// Register some bulk loads.
@@ -125,10 +125,17 @@ protected Set<TableName> fetchFullyBackedUpTables(BackupSystemTable tbl) {
125125
}
126126

127127
// File 1 can no longer be deleted, because it is registered as a bulk load.
128-
deletable = cleaner.getDeletableFiles(List.of(file1, file1Archived, file2, file3));
128+
deletable = callCleaner(cleaner, List.of(file1, file1Archived, file2, file3));
129129
assertEquals(Set.of(file2, file3), Sets.newHashSet(deletable));
130130
}
131131

132+
private Iterable<FileStatus> callCleaner(BackupHFileCleaner cleaner, Iterable<FileStatus> files) {
133+
cleaner.preClean();
134+
Iterable<FileStatus> deletable = cleaner.getDeletableFiles(files);
135+
cleaner.postClean();
136+
return deletable;
137+
}
138+
132139
private FileStatus createFile(String fileName) throws IOException {
133140
Path file = new Path(root, fileName);
134141
fs.createNewFile(file);

hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertFalse;
2222
import static org.junit.Assert.assertTrue;
23+
import static org.junit.Assert.fail;
2324

2425
import java.io.IOException;
2526
import java.nio.ByteBuffer;
27+
import java.util.ArrayList;
2628
import java.util.List;
2729
import java.util.Map;
2830
import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +40,8 @@
3840
import org.apache.hadoop.hbase.testclassification.LargeTests;
3941
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
4042
import org.apache.hadoop.hbase.util.Bytes;
43+
import org.apache.hadoop.hbase.util.CommonFSUtils;
44+
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
4145
import org.apache.hadoop.hbase.util.HFileTestUtil;
4246
import org.junit.ClassRule;
4347
import org.junit.Test;
@@ -147,6 +151,98 @@ private boolean containsRowWithKey(Table table, String rowKey) throws IOExceptio
147151
return result.containsColumn(famName, qualName);
148152
}
149153

154+
@Test
155+
public void testUpdateFileListsRaceCondition() throws Exception {
156+
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
157+
// Test the race condition where files are archived during incremental backup
158+
FileSystem fs = TEST_UTIL.getTestFileSystem();
159+
160+
String regionName = "region1";
161+
String columnFamily = "cf";
162+
String filename1 = "hfile1";
163+
String filename2 = "hfile2";
164+
165+
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
166+
Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
167+
Path activeFile1 =
168+
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename1);
169+
Path activeFile2 =
170+
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename2);
171+
172+
fs.mkdirs(activeFile1.getParent());
173+
fs.create(activeFile1).close();
174+
fs.create(activeFile2).close();
175+
176+
List<String> activeFiles = new ArrayList<>();
177+
activeFiles.add(activeFile1.toString());
178+
activeFiles.add(activeFile2.toString());
179+
List<String> archiveFiles = new ArrayList<>();
180+
181+
Path archiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1,
182+
regionName, columnFamily);
183+
Path archivedFile1 = new Path(archiveDir, filename1);
184+
fs.mkdirs(archiveDir);
185+
assertTrue("File should be moved to archive", fs.rename(activeFile1, archivedFile1));
186+
187+
TestBackupBase.IncrementalTableBackupClientForTest client =
188+
new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
189+
"test_backup_id",
190+
createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR));
191+
192+
client.updateFileLists(activeFiles, archiveFiles);
193+
194+
assertEquals("Only one file should remain in active files", 1, activeFiles.size());
195+
assertEquals("File2 should still be in active files", activeFile2.toString(),
196+
activeFiles.get(0));
197+
assertEquals("One file should be added to archive files", 1, archiveFiles.size());
198+
assertEquals("Archived file should have correct path", archivedFile1.toString(),
199+
archiveFiles.get(0));
200+
systemTable.finishBackupExclusiveOperation();
201+
}
202+
203+
}
204+
205+
@Test
206+
public void testUpdateFileListsMissingArchivedFile() throws Exception {
207+
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
208+
// Test that IOException is thrown when file doesn't exist in archive location
209+
FileSystem fs = TEST_UTIL.getTestFileSystem();
210+
211+
String regionName = "region2";
212+
String columnFamily = "cf";
213+
String filename = "missing_file";
214+
215+
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
216+
Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
217+
Path activeFile =
218+
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename);
219+
220+
fs.mkdirs(activeFile.getParent());
221+
fs.create(activeFile).close();
222+
223+
List<String> activeFiles = new ArrayList<>();
224+
activeFiles.add(activeFile.toString());
225+
List<String> archiveFiles = new ArrayList<>();
226+
227+
// Delete the file but don't create it in archive location
228+
fs.delete(activeFile, false);
229+
230+
TestBackupBase.IncrementalTableBackupClientForTest client =
231+
new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
232+
"test_backup_id",
233+
createBackupRequest(BackupType.INCREMENTAL, List.of(table1), BACKUP_ROOT_DIR));
234+
235+
// This should throw IOException since file doesn't exist in archive
236+
try {
237+
client.updateFileLists(activeFiles, archiveFiles);
238+
fail("Expected IOException to be thrown");
239+
} catch (IOException e) {
240+
// Expected
241+
}
242+
systemTable.finishBackupExclusiveOperation();
243+
}
244+
}
245+
150246
private void performBulkLoad(String keyPrefix) throws IOException {
151247
FileSystem fs = TEST_UTIL.getTestFileSystem();
152248
Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME);

0 commit comments

Comments
 (0)