Skip to content

Commit 547fb98

Browse files
committed
add rsgroup support for backup
1 parent 8c97c51 commit 547fb98

33 files changed

+627
-100
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public interface BackupAdmin extends Closeable {
4242
* @return the backup Id
4343
*/
4444

45-
String backupTables(final BackupRequest userRequest) throws IOException;
45+
BackupInfo backupTables(final BackupRequest userRequest) throws IOException;
4646

4747
/**
4848
* Restore backup

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ public void restore(RestoreRequest request) throws IOException {
510510
}
511511

512512
@Override
513-
public String backupTables(BackupRequest request) throws IOException {
513+
public BackupInfo backupTables(BackupRequest request) throws IOException {
514514
BackupType type = request.getBackupType();
515515
String targetRootDir = request.getTargetRootDir();
516516
List<TableName> tableList = request.getTableList();
@@ -593,7 +593,7 @@ public String backupTables(BackupRequest request) throws IOException {
593593

594594
client.execute();
595595

596-
return backupId;
596+
return client.backupInfo;
597597
}
598598

599599
private List<TableName> excludeNonExistingTables(List<TableName> tableList,

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ public void execute() throws IOException {
347347
.withTargetRootDir(targetBackupDir).withTotalTasks(workers)
348348
.withBandwidthPerTasks(bandwidth).withNoChecksumVerify(ignoreChecksum)
349349
.withBackupSetName(setName).build();
350-
String backupId = admin.backupTables(request);
350+
String backupId = admin.backupTables(request).getBackupId();
351351
System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
352352
} catch (IOException e) {
353353
System.out.println("Backup session finished. Status: FAILURE");

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import java.io.IOException;
2121
import java.util.ArrayList;
2222
import java.util.HashMap;
23+
import java.util.HashSet;
2324
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.stream.Collectors;
2528
import org.apache.hadoop.conf.Configuration;
2629
import org.apache.hadoop.fs.FileStatus;
2730
import org.apache.hadoop.fs.FileSystem;
@@ -33,7 +36,9 @@
3336
import org.apache.hadoop.hbase.backup.util.BackupUtils;
3437
import org.apache.hadoop.hbase.client.Admin;
3538
import org.apache.hadoop.hbase.client.Connection;
39+
import org.apache.hadoop.hbase.net.Address;
3640
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
41+
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
3742
import org.apache.hadoop.hbase.util.CommonFSUtils;
3843
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
3944
import org.apache.yetus.audience.InterfaceAudience;
@@ -93,13 +98,36 @@ public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
9398
}
9499
newTimestamps = readRegionServerLastLogRollResult();
95100

96-
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
101+
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode,
102+
getParticipatingServerNames(backupInfo.getTables()));
97103
logList = excludeProcV2WALs(logList);
98104
backupInfo.setIncrBackupFileList(logList);
99105

100106
return newTimestamps;
101107
}
102108

109+
private Set<String> getParticipatingServerNames(Set<TableName> tables) throws IOException {
110+
Set<Address> participatingServers = new HashSet<>();
111+
boolean flag = false;
112+
for (TableName table : tables) {
113+
RSGroupInfo rsGroupInfo = conn.getAdmin().getRSGroup(table);
114+
if (rsGroupInfo != null && !rsGroupInfo.getServers().isEmpty()) {
115+
LOG.info("Participating servers for table {}, rsgroup Name: {} are: {}", table,
116+
rsGroupInfo.getName(), rsGroupInfo.getServers());
117+
participatingServers.addAll(rsGroupInfo.getServers());
118+
} else {
119+
LOG.warn(
120+
"Rsgroup isn't available for table {}, all servers in the cluster will be participating ",
121+
table);
122+
flag = true;
123+
}
124+
}
125+
126+
return flag
127+
? new HashSet<>()
128+
: participatingServers.stream().map(a -> a.toString()).collect(Collectors.toSet());
129+
}
130+
103131
private List<String> excludeProcV2WALs(List<String> logList) {
104132
List<String> list = new ArrayList<>();
105133
for (int i = 0; i < logList.size(); i++) {
@@ -126,8 +154,8 @@ private List<String> excludeProcV2WALs(List<String> logList) {
126154
* @throws IOException exception
127155
*/
128156
private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
129-
Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
130-
throws IOException {
157+
Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode,
158+
Set<String> servers) throws IOException {
131159
LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
132160
+ "\n newestTimestamps: " + newestTimestamps);
133161

@@ -160,7 +188,7 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
160188
for (FileStatus rs : rss) {
161189
p = rs.getPath();
162190
host = BackupUtils.parseHostNameFromLogFile(p);
163-
if (host == null) {
191+
if (host == null || (!servers.isEmpty() && !servers.contains(host))) {
164192
continue;
165193
}
166194
FileStatus[] logs;
@@ -215,7 +243,7 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
215243
continue;
216244
}
217245
host = BackupUtils.parseHostFromOldLog(p);
218-
if (host == null) {
246+
if (host == null || (!servers.isEmpty() && !servers.contains(host))) {
219247
continue;
220248
}
221249
currentLogTS = BackupUtils.getCreationTime(p);

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import java.util.ArrayList;
2222
import java.util.Collections;
2323
import java.util.HashMap;
24+
import java.util.HashSet;
2425
import java.util.List;
2526
import java.util.Map;
27+
import java.util.Set;
2628
import java.util.stream.Collectors;
2729
import org.apache.hadoop.conf.Configuration;
2830
import org.apache.hadoop.fs.FileStatus;
@@ -41,6 +43,7 @@
4143
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
4244
import org.apache.hadoop.hbase.net.Address;
4345
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
46+
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
4447
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
4548
import org.apache.yetus.audience.InterfaceAudience;
4649
import org.slf4j.Logger;
@@ -98,13 +101,29 @@ private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backu
98101

99102
// This map tracks, for every backup root, the most recent created backup (= highest timestamp)
100103
Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>();
104+
Set<Address> servers = new HashSet<>();
101105
for (BackupInfo backup : backups) {
102106
BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir());
103107
if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) {
104108
newestBackupPerRootDir.put(backup.getBackupRootDir(), backup);
105109
}
106110
}
107111

112+
for (BackupInfo backup : backups) {
113+
for (TableName table : backup.getTables()) {
114+
RSGroupInfo rsGroupInfo = conn.getAdmin().getRSGroup(table);
115+
if (
116+
rsGroupInfo != null && rsGroupInfo.getServers() != null
117+
&& !rsGroupInfo.getServers().isEmpty()
118+
) {
119+
servers.addAll(rsGroupInfo.getServers());
120+
} else {
121+
servers.addAll(conn.getAdmin().getRegionServers().stream().map(s -> s.getAddress())
122+
.collect(Collectors.toList()));
123+
}
124+
}
125+
}
126+
108127
if (LOG.isDebugEnabled()) {
109128
LOG.debug("WAL cleanup time-boundary using info from: {}. ",
110129
newestBackupPerRootDir.entrySet().stream()
@@ -124,7 +143,7 @@ private Map<Address, Long> serverToPreservationBoundaryTs(List<BackupInfo> backu
124143
.entrySet()) {
125144
Address address = Address.fromString(entry.getKey());
126145
Long storedTs = boundaries.get(address);
127-
if (storedTs == null || entry.getValue() < storedTs) {
146+
if ((storedTs == null || entry.getValue() < storedTs) && servers.contains(address)) {
128147
boundaries.put(address, entry.getValue());
129148
}
130149
}

hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717
*/
1818
package org.apache.hadoop.hbase.backup;
1919

20+
import static org.junit.Assert.assertTrue;
21+
2022
import java.io.IOException;
2123
import java.util.ArrayList;
2224
import java.util.Arrays;
2325
import java.util.HashMap;
26+
import java.util.HashSet;
2427
import java.util.Iterator;
2528
import java.util.List;
2629
import java.util.Map;
2730
import java.util.Map.Entry;
2831
import java.util.Objects;
32+
import java.util.Set;
2933
import org.apache.hadoop.conf.Configuration;
3034
import org.apache.hadoop.fs.FileStatus;
3135
import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +62,10 @@
5862
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
5963
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
6064
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner;
65+
import org.apache.hadoop.hbase.net.Address;
6166
import org.apache.hadoop.hbase.regionserver.LogRoller;
67+
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
68+
import org.apache.hadoop.hbase.rsgroup.RSGroupUtil;
6269
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
6370
import org.apache.hadoop.hbase.security.UserProvider;
6471
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
@@ -87,6 +94,15 @@ public class TestBackupBase {
8794
protected static Configuration conf1;
8895
protected static Configuration conf2;
8996

97+
protected static final int RSGROUP_RS_NUM = 5;
98+
protected static final int NUM_REGIONSERVERS = 3;
99+
protected static final String RSGROUP_NAME = "rsgroup1";
100+
protected static final String RSGROUP_NAMESPACE = "rsgroup_ns";
101+
protected static final TableName RSGROUP_TABLE_1 =
102+
TableName.valueOf(RSGROUP_NAMESPACE + ":rsgroup_table1");
103+
protected static final TableName RSGROUP_TABLE_2 =
104+
TableName.valueOf(RSGROUP_NAMESPACE + ":rsgroup_table2");
105+
90106
protected static TableName table1 = TableName.valueOf("table1");
91107
protected static TableDescriptor table1Desc;
92108
protected static TableName table2 = TableName.valueOf("table2");
@@ -108,6 +124,7 @@ public class TestBackupBase {
108124

109125
protected static boolean autoRestoreOnFailure;
110126
protected static boolean useSecondCluster;
127+
protected static boolean enableRSgroup;
111128

112129
static class IncrementalTableBackupClientForTest extends IncrementalTableBackupClient {
113130
public IncrementalTableBackupClientForTest() {
@@ -292,6 +309,22 @@ public void execute() throws IOException {
292309
}
293310
}
294311

312+
private static RSGroupInfo addGroup(String groupName, int serverCount) throws IOException {
313+
Admin admin = TEST_UTIL.getAdmin();
314+
RSGroupInfo defaultInfo = admin.getRSGroup(RSGroupInfo.DEFAULT_GROUP);
315+
admin.addRSGroup(groupName);
316+
Set<Address> set = new HashSet<>();
317+
for (Address server : defaultInfo.getServers()) {
318+
if (set.size() == serverCount) {
319+
break;
320+
}
321+
set.add(server);
322+
}
323+
admin.moveServersToRSGroup(set, groupName);
324+
RSGroupInfo result = admin.getRSGroup(groupName);
325+
return result;
326+
}
327+
295328
public static void setUpHelper() throws Exception {
296329
BACKUP_ROOT_DIR = Path.SEPARATOR + "backupUT";
297330
BACKUP_REMOTE_ROOT_DIR = Path.SEPARATOR + "backupUT";
@@ -314,7 +347,13 @@ public static void setUpHelper() throws Exception {
314347

315348
// Set MultiWAL (with 2 default WAL files per RS)
316349
conf1.set(WALFactory.WAL_PROVIDER, provider);
317-
TEST_UTIL.startMiniCluster();
350+
if (enableRSgroup) {
351+
conf1.setBoolean(RSGroupUtil.RS_GROUP_ENABLED, true);
352+
TEST_UTIL.startMiniCluster(RSGROUP_RS_NUM + NUM_REGIONSERVERS);
353+
addGroup(RSGROUP_NAME, RSGROUP_RS_NUM);
354+
} else {
355+
TEST_UTIL.startMiniCluster();
356+
}
318357

319358
if (useSecondCluster) {
320359
conf2 = HBaseConfiguration.create(conf1);
@@ -352,6 +391,7 @@ public static void setUpHelper() throws Exception {
352391
public static void setUp() throws Exception {
353392
TEST_UTIL = new HBaseTestingUtil();
354393
conf1 = TEST_UTIL.getConfiguration();
394+
enableRSgroup = false;
355395
autoRestoreOnFailure = true;
356396
useSecondCluster = false;
357397
setUpHelper();
@@ -377,6 +417,7 @@ public static void tearDown() throws Exception {
377417
}
378418
TEST_UTIL.shutdownMiniCluster();
379419
TEST_UTIL.shutdownMiniMapReduceCluster();
420+
enableRSgroup = false;
380421
autoRestoreOnFailure = true;
381422
useSecondCluster = false;
382423
}
@@ -406,16 +447,16 @@ protected BackupRequest createBackupRequest(BackupType type, List<TableName> tab
406447
return request;
407448
}
408449

409-
protected String backupTables(BackupType type, List<TableName> tables, String path)
450+
protected BackupInfo backupTables(BackupType type, List<TableName> tables, String path)
410451
throws IOException {
411452
Connection conn = null;
412453
BackupAdmin badmin = null;
413-
String backupId;
454+
BackupInfo backupInfo;
414455
try {
415456
conn = ConnectionFactory.createConnection(conf1);
416457
badmin = new BackupAdminImpl(conn);
417458
BackupRequest request = createBackupRequest(type, new ArrayList<>(tables), path);
418-
backupId = badmin.backupTables(request);
459+
backupInfo = badmin.backupTables(request);
419460
} finally {
420461
if (badmin != null) {
421462
badmin.close();
@@ -424,14 +465,14 @@ protected String backupTables(BackupType type, List<TableName> tables, String pa
424465
conn.close();
425466
}
426467
}
427-
return backupId;
468+
return backupInfo;
428469
}
429470

430-
protected String fullTableBackup(List<TableName> tables) throws IOException {
471+
protected BackupInfo fullTableBackup(List<TableName> tables) throws IOException {
431472
return backupTables(BackupType.FULL, tables, BACKUP_ROOT_DIR);
432473
}
433474

434-
protected String incrementalTableBackup(List<TableName> tables) throws IOException {
475+
protected BackupInfo incrementalTableBackup(List<TableName> tables) throws IOException {
435476
return backupTables(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
436477
}
437478

@@ -479,6 +520,23 @@ protected static void createTables() throws Exception {
479520
table.close();
480521
ha.close();
481522
conn.close();
523+
524+
if (enableRSgroup) {
525+
ha.createNamespace(NamespaceDescriptor.create(RSGROUP_NAMESPACE)
526+
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, RSGROUP_NAME).build());
527+
528+
ha.createTable(TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_1)
529+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build());
530+
table = ConnectionFactory.createConnection(conf1).getTable(RSGROUP_TABLE_1);
531+
loadTable(table);
532+
table.close();
533+
534+
ha.createTable(TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_2)
535+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build());
536+
table = ConnectionFactory.createConnection(conf1).getTable(RSGROUP_TABLE_2);
537+
loadTable(table);
538+
table.close();
539+
}
482540
}
483541

484542
protected boolean checkSucceeded(String backupId) throws IOException {
@@ -501,7 +559,7 @@ protected boolean checkFailed(String backupId) throws IOException {
501559
return status.getState() == BackupState.FAILED;
502560
}
503561

504-
private BackupInfo getBackupInfo(String backupId) throws IOException {
562+
protected BackupInfo getBackupInfo(String backupId) throws IOException {
505563
try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) {
506564
BackupInfo status = table.readBackupInfo(backupId);
507565
return status;
@@ -538,6 +596,26 @@ protected List<FileStatus> getListOfWALFiles(Configuration c) throws IOException
538596
return logFiles;
539597
}
540598

599+
protected Set<Address> getRsgroupServers(String rsgroupName) throws IOException {
600+
RSGroupInfo rsGroupInfo = TEST_UTIL.getAdmin().getRSGroup(rsgroupName);
601+
if (
602+
rsGroupInfo != null && rsGroupInfo.getServers() != null && !rsGroupInfo.getServers().isEmpty()
603+
) {
604+
return new HashSet<>(rsGroupInfo.getServers());
605+
}
606+
return new HashSet<>();
607+
}
608+
609+
protected void checkIfWALFilesBelongToRsgroup(List<String> walFiles, String rsgroupName)
610+
throws IOException {
611+
for (String file : walFiles) {
612+
Address walServerAddress =
613+
Address.fromString(BackupUtils.parseHostNameFromLogFile(new Path(file)));
614+
assertTrue("Backed WAL files should be from RSGroup " + rsgroupName,
615+
getRsgroupServers(rsgroupName).contains(walServerAddress));
616+
}
617+
}
618+
541619
protected void dumpBackupDir() throws IOException {
542620
// Dump Backup Dir
543621
FileSystem fs = FileSystem.get(conf1);

0 commit comments

Comments
 (0)