Skip to content

Commit f6c5dbe

Browse files
authored
VerifyReplication recompare async (#5051)
Signed-off-by: Bryan Beaudreault <[email protected]>
1 parent 0bbc8d1 commit f6c5dbe

File tree

4 files changed

+591
-36
lines changed

4 files changed

+591
-36
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java

Lines changed: 151 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919

2020
import java.io.IOException;
2121
import java.util.Arrays;
22+
import java.util.List;
2223
import java.util.UUID;
24+
import java.util.concurrent.SynchronousQueue;
25+
import java.util.concurrent.ThreadPoolExecutor;
26+
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
27+
import java.util.concurrent.TimeUnit;
2328
import org.apache.hadoop.conf.Configuration;
2429
import org.apache.hadoop.conf.Configured;
2530
import org.apache.hadoop.fs.FileSystem;
@@ -30,7 +35,6 @@
3035
import org.apache.hadoop.hbase.TableName;
3136
import org.apache.hadoop.hbase.client.Connection;
3237
import org.apache.hadoop.hbase.client.ConnectionFactory;
33-
import org.apache.hadoop.hbase.client.Get;
3438
import org.apache.hadoop.hbase.client.Put;
3539
import org.apache.hadoop.hbase.client.Result;
3640
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -46,6 +50,7 @@
4650
import org.apache.hadoop.hbase.mapreduce.TableMapper;
4751
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
4852
import org.apache.hadoop.hbase.mapreduce.TableSplit;
53+
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters;
4954
import org.apache.hadoop.hbase.replication.ReplicationException;
5055
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
5156
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
@@ -55,12 +60,12 @@
5560
import org.apache.hadoop.hbase.util.Bytes;
5661
import org.apache.hadoop.hbase.util.CommonFSUtils;
5762
import org.apache.hadoop.hbase.util.Pair;
58-
import org.apache.hadoop.hbase.util.Threads;
5963
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
6064
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
6165
import org.apache.hadoop.mapreduce.InputSplit;
6266
import org.apache.hadoop.mapreduce.Job;
6367
import org.apache.hadoop.mapreduce.MRJobConfig;
68+
import org.apache.hadoop.mapreduce.Mapper;
6469
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
6570
import org.apache.hadoop.util.Tool;
6671
import org.apache.hadoop.util.ToolRunner;
@@ -84,6 +89,11 @@ public class VerifyReplication extends Configured implements Tool {
8489

8590
public final static String NAME = "verifyrep";
8691
private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
92+
private static ThreadPoolExecutor reCompareExecutor = null;
93+
int reCompareTries = 0;
94+
int reCompareBackoffExponent = 0;
95+
int reCompareThreads = 0;
96+
int sleepMsBeforeReCompare = 0;
8797
long startTime = 0;
8898
long endTime = Long.MAX_VALUE;
8999
int batch = -1;
@@ -94,7 +104,6 @@ public class VerifyReplication extends Configured implements Tool {
94104
String peerId = null;
95105
String peerQuorumAddress = null;
96106
String rowPrefixes = null;
97-
int sleepMsBeforeReCompare = 0;
98107
boolean verbose = false;
99108
boolean includeDeletedCells = false;
100109
// Source table snapshot name
@@ -124,7 +133,12 @@ public enum Counters {
124133
BADROWS,
125134
ONLY_IN_SOURCE_TABLE_ROWS,
126135
ONLY_IN_PEER_TABLE_ROWS,
127-
CONTENT_DIFFERENT_ROWS
136+
CONTENT_DIFFERENT_ROWS,
137+
RECOMPARES,
138+
MAIN_THREAD_RECOMPARES,
139+
SOURCE_ROW_CHANGED,
140+
PEER_ROW_CHANGED,
141+
FAILED_RECOMPARE
128142
}
129143

130144
private Connection sourceConnection;
@@ -133,6 +147,9 @@ public enum Counters {
133147
private Table replicatedTable;
134148
private ResultScanner replicatedScanner;
135149
private Result currentCompareRowInPeerTable;
150+
private Scan tableScan;
151+
private int reCompareTries;
152+
private int reCompareBackoffExponent;
136153
private int sleepMsBeforeReCompare;
137154
private String delimiter = "";
138155
private boolean verbose = false;
@@ -150,7 +167,12 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
150167
throws IOException {
151168
if (replicatedScanner == null) {
152169
Configuration conf = context.getConfiguration();
170+
reCompareTries = conf.getInt(NAME + ".recompareTries", 0);
171+
reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1);
153172
sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0);
173+
if (sleepMsBeforeReCompare > 0) {
174+
reCompareTries = Math.max(reCompareTries, 1);
175+
}
154176
delimiter = conf.get(NAME + ".delimiter", "");
155177
verbose = conf.getBoolean(NAME + ".verbose", false);
156178
batch = conf.getInt(NAME + ".batch", -1);
@@ -179,9 +201,12 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
179201
if (versions >= 0) {
180202
scan.readVersions(versions);
181203
}
204+
int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0);
205+
reCompareExecutor = buildReCompareExecutor(reCompareThreads, context);
182206
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
183207
sourceConnection = ConnectionFactory.createConnection(conf);
184208
sourceTable = sourceConnection.getTable(tableName);
209+
tableScan = scan;
185210

186211
final InputSplit tableSplit = context.getInputSplit();
187212

@@ -226,7 +251,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
226251
while (true) {
227252
if (currentCompareRowInPeerTable == null) {
228253
// reach the region end of peer table, row only in source table
229-
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
254+
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
230255
break;
231256
}
232257
int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
@@ -240,55 +265,77 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
240265
"Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
241266
}
242267
} catch (Exception e) {
243-
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
268+
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value,
269+
currentCompareRowInPeerTable);
244270
}
245271
currentCompareRowInPeerTable = replicatedScanner.next();
246272
break;
247273
} else if (rowCmpRet < 0) {
248274
// row only exists in source table
249-
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
275+
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
250276
break;
251277
} else {
252278
// row only exists in peer table
253-
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
279+
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
254280
currentCompareRowInPeerTable);
255281
currentCompareRowInPeerTable = replicatedScanner.next();
256282
}
257283
}
258284
}
259285

260-
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
261-
if (sleepMsBeforeReCompare > 0) {
262-
Threads.sleep(sleepMsBeforeReCompare);
263-
try {
264-
Result sourceResult = sourceTable.get(new Get(row.getRow()));
265-
Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
266-
Result.compareResults(sourceResult, replicatedResult, false);
267-
if (!sourceResult.isEmpty()) {
268-
context.getCounter(Counters.GOODROWS).increment(1);
269-
if (verbose) {
270-
LOG.info("Good row key (with recompare): " + delimiter
271-
+ Bytes.toStringBinary(row.getRow()) + delimiter);
272-
}
273-
}
274-
return;
275-
} catch (Exception e) {
276-
LOG.error("recompare fail after sleep, rowkey=" + delimiter
277-
+ Bytes.toStringBinary(row.getRow()) + delimiter);
278-
}
286+
@SuppressWarnings("FutureReturnValueIgnored")
287+
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row,
288+
Result replicatedRow) {
289+
byte[] rowKey = getRow(row, replicatedRow);
290+
if (reCompareTries == 0) {
291+
context.getCounter(counter).increment(1);
292+
context.getCounter(Counters.BADROWS).increment(1);
293+
LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter);
294+
return;
295+
}
296+
297+
VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
298+
row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable,
299+
reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose);
300+
301+
if (reCompareExecutor == null) {
302+
runnable.run();
303+
return;
279304
}
280-
context.getCounter(counter).increment(1);
281-
context.getCounter(Counters.BADROWS).increment(1);
282-
LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow())
283-
+ delimiter);
305+
306+
reCompareExecutor.submit(runnable);
284307
}
285308

286309
@Override
287310
protected void cleanup(Context context) {
311+
if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
312+
reCompareExecutor.shutdown();
313+
try {
314+
boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);
315+
if (!terminated) {
316+
List<Runnable> queue = reCompareExecutor.shutdownNow();
317+
for (Runnable runnable : queue) {
318+
((VerifyReplicationRecompareRunnable) runnable).fail();
319+
}
320+
321+
terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);
322+
323+
if (!terminated) {
324+
int activeCount = Math.max(1, reCompareExecutor.getActiveCount());
325+
LOG.warn("Found {} possible recompares still running in the executable"
326+
+ " incrementing BADROWS and FAILED_RECOMPARE", activeCount);
327+
context.getCounter(Counters.BADROWS).increment(activeCount);
328+
context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount);
329+
}
330+
}
331+
} catch (InterruptedException e) {
332+
throw new RuntimeException("Failed to await executor termination in cleanup", e);
333+
}
334+
}
288335
if (replicatedScanner != null) {
289336
try {
290337
while (currentCompareRowInPeerTable != null) {
291-
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
338+
logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
292339
currentCompareRowInPeerTable);
293340
currentCompareRowInPeerTable = replicatedScanner.next();
294341
}
@@ -424,6 +471,10 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
424471
conf.setInt(NAME + ".versions", versions);
425472
LOG.info("Number of version: " + versions);
426473

474+
conf.setInt(NAME + ".recompareTries", reCompareTries);
475+
conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
476+
conf.setInt(NAME + ".recompareThreads", reCompareThreads);
477+
427478
// Set Snapshot specific parameters
428479
if (peerSnapshotName != null) {
429480
conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
@@ -491,6 +542,15 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
491542
return job;
492543
}
493544

545+
protected static byte[] getRow(Result sourceResult, Result replicatedResult) {
546+
if (sourceResult != null) {
547+
return sourceResult.getRow();
548+
} else if (replicatedResult != null) {
549+
return replicatedResult.getRow();
550+
}
551+
throw new RuntimeException("Both sourceResult and replicatedResult are null!");
552+
}
553+
494554
private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
495555
if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
496556
String[] rowPrefixArray = rowPrefixes.split(",");
@@ -575,11 +635,20 @@ public boolean doCommandLine(final String[] args) {
575635
continue;
576636
}
577637

578-
final String sleepToReCompareKey = "--recomparesleep=";
638+
final String deprecatedSleepToReCompareKey = "--recomparesleep=";
639+
final String sleepToReCompareKey = "--recompareSleep=";
640+
if (cmd.startsWith(deprecatedSleepToReCompareKey)) {
641+
LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0."
642+
+ " Use --recompareSleep instead.");
643+
sleepMsBeforeReCompare =
644+
Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length()));
645+
continue;
646+
}
579647
if (cmd.startsWith(sleepToReCompareKey)) {
580648
sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
581649
continue;
582650
}
651+
583652
final String verboseKey = "--verbose";
584653
if (cmd.startsWith(verboseKey)) {
585654
verbose = true;
@@ -628,6 +697,25 @@ public boolean doCommandLine(final String[] args) {
628697
continue;
629698
}
630699

700+
final String reCompareThreadArgs = "--recompareThreads=";
701+
if (cmd.startsWith(reCompareThreadArgs)) {
702+
reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length()));
703+
continue;
704+
}
705+
706+
final String reCompareTriesKey = "--recompareTries=";
707+
if (cmd.startsWith(reCompareTriesKey)) {
708+
reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length()));
709+
continue;
710+
}
711+
712+
final String reCompareBackoffExponentKey = "--recompareBackoffExponent=";
713+
if (cmd.startsWith(reCompareBackoffExponentKey)) {
714+
reCompareBackoffExponent =
715+
Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length()));
716+
continue;
717+
}
718+
631719
if (cmd.startsWith("--")) {
632720
printUsage("Invalid argument '" + cmd + "'");
633721
return false;
@@ -704,7 +792,8 @@ private static void printUsage(final String errorMsg) {
704792
System.err.println("ERROR: " + errorMsg);
705793
}
706794
System.err.println("Usage: verifyrep [--starttime=X]"
707-
+ " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] "
795+
+ " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] "
796+
+ "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]"
708797
+ "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
709798
+ "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
710799
+ "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
@@ -720,8 +809,14 @@ private static void printUsage(final String errorMsg) {
720809
System.err.println(" families comma-separated list of families to copy");
721810
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
722811
System.err.println(" delimiter the delimiter used in display around rowkey");
723-
System.err.println(" recomparesleep milliseconds to sleep before recompare row, "
812+
System.err.println(" recompareSleep milliseconds to sleep before recompare row, "
724813
+ "default value is 0 which disables the recompare.");
814+
System.err.println(" recompareThreads number of threads to run recompares in");
815+
System.err.println(" recompareTries number of recompare attempts before incrementing "
816+
+ "the BADROWS counter. Defaults to 1 recompare");
817+
System.out.println(" recompareBackoffExponent exponential multiplier to increase "
818+
+ "recompareSleep after each recompare attempt, "
819+
+ "default value is 0 which results in a constant sleep time");
725820
System.err.println(" verbose logs row keys of good rows");
726821
System.err.println(" peerTableName Peer Table Name");
727822
System.err.println(" sourceSnapshotName Source Snapshot Name");
@@ -788,6 +883,27 @@ private static void printUsage(final String errorMsg) {
788883
+ "2181:/cluster-b \\\n" + " TestTable");
789884
}
790885

886+
private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) {
887+
if (maxThreads == 0) {
888+
return null;
889+
}
890+
891+
return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(),
892+
buildRejectedReComparePolicy(context));
893+
}
894+
895+
private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) {
896+
return new CallerRunsPolicy() {
897+
@Override
898+
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
899+
LOG.debug("Re-comparison execution rejected. Running in main thread.");
900+
context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1);
901+
// will run in the current thread
902+
super.rejectedExecution(runnable, e);
903+
}
904+
};
905+
}
906+
791907
@Override
792908
public int run(String[] args) throws Exception {
793909
Configuration conf = this.getConf();

0 commit comments

Comments
 (0)