Skip to content

Commit 8f82cbf

Browse files
Apache9apurtell
authored andcommitted
HBASE-26120 New replication gets stuck or data loss when multiwal groups more than 10 (#3528)
Signed-off-by: Andrew Purtell <[email protected]> Signed-off-by: Michael Stack <[email protected]> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
1 parent dbeabba commit 8f82cbf

File tree

2 files changed

+34
-23
lines changed

2 files changed

+34
-23
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@
1919

2020
package org.apache.hadoop.hbase.replication.regionserver;
2121

22+
import com.google.common.collect.Sets;
2223
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2324

2425
import java.io.IOException;
2526
import java.util.ArrayList;
2627
import java.util.Collections;
2728
import java.util.HashMap;
28-
import java.util.HashSet;
29-
import java.util.Iterator;
3029
import java.util.List;
3130
import java.util.Map;
3231
import java.util.Random;
@@ -108,7 +107,7 @@ public class ReplicationSourceManager implements ReplicationListener {
108107
private final Configuration conf;
109108
private final FileSystem fs;
110109
// The paths to the latest log of each wal group, for new coming peers
111-
private Set<Path> latestPaths;
110+
private final Map<String, Path> latestPaths;
112111
// Path to the wals directories
113112
private final Path logDir;
114113
// Path to the wal archive
@@ -171,7 +170,7 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
171170
tfb.setDaemon(true);
172171
this.executor.setThreadFactory(tfb.build());
173172
this.rand = new Random();
174-
this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
173+
this.latestPaths = new HashMap<>();
175174
replicationForBulkLoadDataEnabled =
176175
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
177176
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
@@ -305,23 +304,22 @@ protected ReplicationSourceInterface addSource(String id) throws IOException,
305304
this.walsById.put(id, walsByGroup);
306305
// Add the latest wal to that source's queue
307306
synchronized (latestPaths) {
308-
if (this.latestPaths.size() > 0) {
309-
for (Path logPath : latestPaths) {
310-
String name = logPath.getName();
311-
String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
312-
SortedSet<String> logs = new TreeSet<String>();
313-
logs.add(name);
314-
walsByGroup.put(walPrefix, logs);
307+
if (!latestPaths.isEmpty()) {
308+
for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
309+
Path walPath = walPrefixAndPath.getValue();
310+
SortedSet<String> wals = new TreeSet<>();
311+
wals.add(walPath.getName());
312+
walsByGroup.put(walPrefixAndPath.getKey(), wals);
315313
try {
316-
this.replicationQueues.addLog(id, name);
314+
this.replicationQueues.addLog(id, walPath.getName());
317315
} catch (ReplicationException e) {
318316
String message =
319317
"Cannot add log to queue when creating a new source, queueId=" + id
320-
+ ", filename=" + name;
318+
+ ", filename=" + walPath.getName();
321319
server.stop(message);
322320
throw e;
323321
}
324-
src.enqueueLog(logPath);
322+
src.enqueueLog(walPath);
325323
}
326324
}
327325
}
@@ -409,15 +407,7 @@ void preLogRoll(Path newLog) throws IOException {
409407
String logName = newLog.getName();
410408
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
411409
synchronized (latestPaths) {
412-
Iterator<Path> iterator = latestPaths.iterator();
413-
while (iterator.hasNext()) {
414-
Path path = iterator.next();
415-
if (path.getName().contains(logPrefix)) {
416-
iterator.remove();
417-
break;
418-
}
419-
}
420-
this.latestPaths.add(newLog);
410+
latestPaths.put(logPrefix, newLog);
421411
}
422412
}
423413

@@ -693,6 +683,12 @@ public void peerListChanged(List<String> peerIds) {
693683
}
694684
}
695685

686+
Set<Path> getLastestPath() {
687+
synchronized (latestPaths) {
688+
return Sets.newHashSet(latestPaths.values());
689+
}
690+
}
691+
696692
/**
697693
* Class responsible to setup new ReplicationSources to take care of the
698694
* queues from dead region servers.

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.List;
3535
import java.util.Map;
3636
import java.util.NavigableMap;
37+
import java.util.Set;
3738
import java.util.SortedMap;
3839
import java.util.SortedSet;
3940
import java.util.TreeMap;
@@ -509,6 +510,20 @@ private void removePeerAndWait(final String peerId) throws Exception {
509510
});
510511
}
511512

513+
@Test
514+
public void testSameWALPrefix() throws IOException {
515+
Set<Path> latestWalsBefore = manager.getLastestPath();
516+
Path walName1 = new Path("localhost,8080,12345-45678-Peer.34567");
517+
Path walName2 = new Path("localhost,8080,12345.56789");
518+
manager.preLogRoll(walName1);
519+
manager.preLogRoll(walName2);
520+
Set<Path> latestWals = manager.getLastestPath();
521+
latestWals.removeAll(latestWalsBefore);
522+
assertEquals(2, latestWals.size());
523+
assertTrue(latestWals.contains(walName1));
524+
assertTrue(latestWals.contains(walName2));
525+
}
526+
512527
private WALEdit getBulkLoadWALEdit() {
513528
// 1. Create store files for the families
514529
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);

0 commit comments

Comments
 (0)