Skip to content

Commit cc712c2

Browse files
committed
add rsgroup support for backup
1 parent f65b769 commit cc712c2

29 files changed

+564
-83
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public interface BackupAdmin extends Closeable {
4444
* @return the backup Id
4545
*/
4646

47-
String backupTables(final BackupRequest userRequest) throws IOException;
47+
BackupInfo backupTables(final BackupRequest userRequest) throws IOException;
4848

4949
/**
5050
* Restore backup

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ public void restore(RestoreRequest request) throws IOException {
521521
}
522522

523523
@Override
524-
public String backupTables(BackupRequest request) throws IOException {
524+
public BackupInfo backupTables(BackupRequest request) throws IOException {
525525
BackupType type = request.getBackupType();
526526
String targetRootDir = request.getTargetRootDir();
527527
List<TableName> tableList = request.getTableList();
@@ -604,7 +604,7 @@ public String backupTables(BackupRequest request) throws IOException {
604604

605605
client.execute();
606606

607-
return backupId;
607+
return client.backupInfo;
608608
}
609609

610610
private List<TableName> excludeNonExistingTables(List<TableName> tableList,

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,9 @@
4040
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC;
4141
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
4242
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC;
43-
4443
import java.io.IOException;
4544
import java.net.URI;
4645
import java.util.List;
47-
4846
import org.apache.commons.lang3.StringUtils;
4947
import org.apache.hadoop.conf.Configuration;
5048
import org.apache.hadoop.conf.Configured;
@@ -66,7 +64,6 @@
6664
import org.apache.hadoop.hbase.client.ConnectionFactory;
6765
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
6866
import org.apache.yetus.audience.InterfaceAudience;
69-
7067
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
7168
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
7269
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
@@ -345,8 +342,9 @@ public void execute() throws IOException {
345342
tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
346343
.withTargetRootDir(targetBackupDir).withTotalTasks(workers)
347344
.withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build();
348-
String backupId = admin.backupTables(request);
349-
System.out.println("Backup session " + backupId + " finished. Status: SUCCESS");
345+
BackupInfo backupInfo = admin.backupTables(request);
346+
System.out
347+
.println("Backup session " + backupInfo.getBackupId() + " finished. Status: SUCCESS");
350348
} catch (IOException e) {
351349
System.out.println("Backup session finished. Status: FAILURE");
352350
throw e;

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import java.io.IOException;
2222
import java.util.ArrayList;
2323
import java.util.HashMap;
24+
import java.util.HashSet;
2425
import java.util.List;
2526
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.stream.Collectors;
2629
import org.apache.hadoop.conf.Configuration;
2730
import org.apache.hadoop.fs.FileStatus;
2831
import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +37,9 @@
3437
import org.apache.hadoop.hbase.backup.util.BackupUtils;
3538
import org.apache.hadoop.hbase.client.Admin;
3639
import org.apache.hadoop.hbase.client.Connection;
40+
import org.apache.hadoop.hbase.net.Address;
3741
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
42+
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
3843
import org.apache.hadoop.hbase.util.CommonFSUtils;
3944
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
4045
import org.apache.yetus.audience.InterfaceAudience;
@@ -94,13 +99,36 @@ public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
9499
}
95100
newTimestamps = readRegionServerLastLogRollResult();
96101

97-
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
102+
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode,
103+
getParticipatingServerNames(backupInfo.getTables()));
98104
logList = excludeProcV2WALs(logList);
99105
backupInfo.setIncrBackupFileList(logList);
100106

101107
return newTimestamps;
102108
}
103109

110+
private Set<String> getParticipatingServerNames(Set<TableName> tables) throws IOException {
111+
Set<Address> participatingServers = new HashSet<>();
112+
boolean flag = false;
113+
for (TableName table : tables) {
114+
RSGroupInfo rsGroupInfo = conn.getAdmin().getRSGroup(table);
115+
if (rsGroupInfo != null && !rsGroupInfo.getServers().isEmpty()) {
116+
LOG.info("Participating servers for table {}, rsgroup Name: {} are: {}", table,
117+
rsGroupInfo.getName(), rsGroupInfo.getServers());
118+
participatingServers.addAll(rsGroupInfo.getServers());
119+
} else {
120+
LOG.warn(
121+
"Rsgroup isn't available for table {}, all servers in the cluster will be participating ",
122+
table);
123+
flag = true;
124+
}
125+
}
126+
127+
return flag ?
128+
new HashSet<>() :
129+
participatingServers.stream().map(a -> a.toString()).collect(Collectors.toSet());
130+
}
131+
104132
private List<String> excludeProcV2WALs(List<String> logList) {
105133
List<String> list = new ArrayList<>();
106134
for (int i=0; i < logList.size(); i++) {
@@ -127,8 +155,8 @@ private List<String> excludeProcV2WALs(List<String> logList) {
127155
* @throws IOException exception
128156
*/
129157
private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
130-
Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
131-
throws IOException {
158+
Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode,
159+
Set<String> servers) throws IOException {
132160
LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
133161
+ "\n newestTimestamps: " + newestTimestamps);
134162

@@ -161,7 +189,7 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
161189
for (FileStatus rs : rss) {
162190
p = rs.getPath();
163191
host = BackupUtils.parseHostNameFromLogFile(p);
164-
if (host == null) {
192+
if (host == null || (!servers.isEmpty() && !servers.contains(host))) {
165193
continue;
166194
}
167195
FileStatus[] logs;
@@ -216,7 +244,7 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
216244
continue;
217245
}
218246
host = BackupUtils.parseHostFromOldLog(p);
219-
if (host == null) {
247+
if (host == null || (!servers.isEmpty() && !servers.contains(host))) {
220248
continue;
221249
}
222250
currentLogTS = BackupUtils.getCreationTime(p);

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
import java.util.ArrayList;
2323
import java.util.Collections;
2424
import java.util.HashMap;
25+
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.stream.Collectors;
2730
import org.apache.hadoop.conf.Configuration;
2831
import org.apache.hadoop.fs.FileStatus;
2932
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -39,6 +42,7 @@
3942
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
4043
import org.apache.hadoop.hbase.net.Address;
4144
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
45+
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
4246
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
4347
import org.apache.yetus.audience.InterfaceAudience;
4448
import org.slf4j.Logger;
@@ -85,14 +89,31 @@ private Map<Address, Long> getServersToOldestBackupMapping(List<BackupInfo> back
8589
Map<Address, Long> serverAddressToLastBackupMap = new HashMap<>();
8690

8791
Map<TableName, Long> tableNameBackupInfoMap = new HashMap<>();
92+
Set<Address> servers = new HashSet<>();
93+
for (BackupInfo backupInfo : backups) {
94+
for (TableName table : backupInfo.getTables()) {
95+
RSGroupInfo rsGroupInfo = conn.getAdmin().getRSGroup(table);
96+
if (rsGroupInfo != null && rsGroupInfo.getServers() != null && !rsGroupInfo.getServers()
97+
.isEmpty()) {
98+
servers.addAll(rsGroupInfo.getServers());
99+
} else {
100+
servers.addAll(conn.getAdmin().getRegionServers().stream().map(s -> s.getAddress())
101+
.collect(Collectors.toList()));
102+
}
103+
}
104+
}
105+
88106
for (BackupInfo backupInfo : backups) {
89107
for (TableName table : backupInfo.getTables()) {
90108
tableNameBackupInfoMap.putIfAbsent(table, backupInfo.getStartTs());
91109
if (tableNameBackupInfoMap.get(table) <= backupInfo.getStartTs()) {
92110
tableNameBackupInfoMap.put(table, backupInfo.getStartTs());
93111
for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
94112
.entrySet()) {
95-
serverAddressToLastBackupMap.put(Address.fromString(entry.getKey()), entry.getValue());
113+
if (servers.contains(Address.fromString(entry.getKey()))) {
114+
serverAddressToLastBackupMap
115+
.put(Address.fromString(entry.getKey()), entry.getValue());
116+
}
96117
}
97118
}
98119
}

0 commit comments

Comments
 (0)