Skip to content

Commit 3c74d11

Browse files
2005hithljApache9
authored andcommitted
HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)
Signed-off-by: Duo Zhang <[email protected]>
1 parent 96960f2 commit 3c74d11

File tree

3 files changed

+84
-5
lines changed

3 files changed

+84
-5
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
import java.io.FileNotFoundException;
2121
import java.io.IOException;
22+
import java.util.Arrays;
2223
import java.util.Collections;
2324
import java.util.HashSet;
2425
import java.util.Iterator;
26+
import java.util.LinkedList;
2527
import java.util.List;
2628
import java.util.Set;
2729
import org.apache.hadoop.conf.Configuration;
@@ -182,19 +184,56 @@ private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName
182184
}
183185
}
184186

185-
private void writeInfoFile(FileSystem fs) throws IOException {
187+
private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException {
186188
// Record the info of this run. Currently only record the time we run the job. We will use this
187189
// timestamp to clean up the data for last sequence ids and hfile refs in replication queue
188190
// storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
189191
ReplicationSyncUpToolInfo info =
190192
new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
191193
String json = JsonMapper.writeObjectAsString(info);
192194
Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
193-
try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) {
195+
try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) {
194196
out.write(Bytes.toBytes(json));
195197
}
196198
}
197199

200+
private static boolean parseOpts(String args[]) {
201+
LinkedList<String> argv = new LinkedList<>();
202+
argv.addAll(Arrays.asList(args));
203+
String cmd = null;
204+
while ((cmd = argv.poll()) != null) {
205+
if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
206+
printUsageAndExit(null, 0);
207+
}
208+
if (cmd.equals("-f")) {
209+
return true;
210+
}
211+
if (!argv.isEmpty()) {
212+
printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
213+
}
214+
}
215+
return false;
216+
}
217+
218+
private static void printUsageAndExit(final String message, final int exitCode) {
219+
printUsage(message);
220+
System.exit(exitCode);
221+
}
222+
223+
private static void printUsage(final String message) {
224+
if (message != null && message.length() > 0) {
225+
System.err.println(message);
226+
}
227+
System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\");
228+
System.err.println(" <OPTIONS> [-D<property=value>]*");
229+
System.err.println();
230+
System.err.println("General Options:");
231+
System.err.println(" -h|--h|--help Show this help and exit.");
232+
System.err
233+
.println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. "
234+
+ "See HBASE-27623 for details.");
235+
}
236+
198237
@Override
199238
public int run(String[] args) throws Exception {
200239
Abortable abortable = new Abortable() {
@@ -217,6 +256,7 @@ public boolean isAborted() {
217256
return abort;
218257
}
219258
};
259+
boolean isForce = parseOpts(args);
220260
Configuration conf = getConf();
221261
try (ZKWatcher zkw = new ZKWatcher(conf,
222262
"syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) {
@@ -226,7 +266,7 @@ public boolean isAborted() {
226266
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
227267

228268
System.out.println("Start Replication Server");
229-
writeInfoFile(fs);
269+
writeInfoFile(fs, isForce);
230270
Replication replication = new Replication();
231271
// use offline table replication queue storage
232272
getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.ArrayList;
2828
import java.util.List;
2929
import java.util.stream.Collectors;
30+
import org.apache.hadoop.fs.FileAlreadyExistsException;
31+
import org.apache.hadoop.fs.FileStatus;
3032
import org.apache.hadoop.fs.FileSystem;
3133
import org.apache.hadoop.fs.Path;
3234
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -300,4 +302,38 @@ private void mimicSyncUpAfterPut() throws Exception {
300302
assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
301303
rowCountHt2TargetAtPeer1);
302304
}
305+
306+
/**
307+
* test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details.
308+
*/
309+
@Test
310+
public void testStartANewSyncUpToolAfterFailed() throws Exception {
311+
// Start syncUpTool for the first time with non-force mode,
312+
// let's assume that this will fail in sync data,
313+
// this does not affect our test results
314+
syncUp(UTIL1);
315+
Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
316+
Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
317+
Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE);
318+
FileSystem fs = UTIL1.getTestFileSystem();
319+
assertTrue(fs.exists(replicationInfoPath));
320+
FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath);
321+
322+
// Start syncUpTool for the second time with non-force mode,
323+
// startup will fail because replication info file already exists
324+
try {
325+
syncUp(UTIL1);
326+
} catch (Exception e) {
327+
assertTrue("e should be a FileAlreadyExistsException",
328+
(e instanceof FileAlreadyExistsException));
329+
}
330+
FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath);
331+
assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime());
332+
333+
// Start syncUpTool for the third time with force mode,
334+
// startup will success and create a new replication info file
335+
syncUp(UTIL1, new String[] { "-f" });
336+
FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath);
337+
assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime());
338+
}
303339
}

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,11 @@ final void setupReplication() throws Exception {
136136
}
137137

138138
final void syncUp(HBaseTestingUtil util) throws Exception {
139-
ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(),
140-
new String[0]);
139+
syncUp(util, new String[0]);
140+
}
141+
142+
final void syncUp(HBaseTestingUtil util, String[] args) throws Exception {
143+
ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), args);
141144
}
142145

143146
// Utilities that manager shutdown / restart of source / sink clusters. They take care of

0 commit comments

Comments
 (0)