Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -67,6 +68,13 @@ public interface ReplicationQueuesClient {
*/
int getQueuesZNodeCversion() throws KeeperException;

/**
* Get a map of cversion of all replicator nodes. This can be used as optimistic locking
* to get a consistent snapshot of the replication queues.
* @return a map of replicator to cversion
*/
Map<String, Integer> getReplicatorsZNodeCversion() throws KeeperException;

/**
* Get the change version number of replication hfile references node. This can be used as
* optimistic locking to get a consistent snapshot of the replication queues of hfile references.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.replication;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -92,6 +94,18 @@ public List<String> getAllQueues(String serverName) throws KeeperException {
}
}

@Override public Map<String, Integer> getReplicatorsZNodeCversion()
throws KeeperException {
List<String> rss = super.getListOfReplicatorsZK();
Map<String, Integer> rsToCversion = new HashMap<>();
for (String rs : rss) {
Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, ZKUtil.joinZNode(this.queuesZNode, rs), stat);
rsToCversion.put(rs, stat.getCversion());
}
return rsToCversion;
}

@Override
public int getHFileRefsNodeChangeVersion() throws KeeperException {
Stat stat = new Stat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ private Set<String> loadWALsFromQueues() throws KeeperException {
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
return ImmutableSet.of();
}
// We should also check cversions of all rs nodes to Prevent missing of WAL which are claiming
// by other regionServer. For details, please see HBASE-26482
Map<String, Integer> rsToCversionBefore = replicationQueues.getReplicatorsZNodeCversion();
Set<String> wals = Sets.newHashSet();
for (String rs : rss) {
List<String> listOfPeers = replicationQueues.getAllQueues(rs);
Expand All @@ -121,7 +124,8 @@ private Set<String> loadWALsFromQueues() throws KeeperException {
}
}
int v1 = replicationQueues.getQueuesZNodeCversion();
if (v0 == v1) {
Map<String, Integer> rsToCversionAfter = replicationQueues.getReplicatorsZNodeCversion();
if (v0 == v1 && rsToCversionBefore.equals(rsToCversionAfter)) {
return wals;
}
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand All @@ -41,6 +42,7 @@
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -188,6 +190,9 @@ public void testZnodeCversionChange() throws Exception {

ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
// Avoid direct return because there no replicator.
Mockito.when(rqcMock.getListOfReplicators())
.thenReturn(Lists.newArrayList("s1", "s2"));

Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
rqc.setAccessible(true);
Expand All @@ -196,6 +201,35 @@ public void testZnodeCversionChange() throws Exception {

// This should return eventually when cversion stabilizes
cleaner.getDeletableFiles(new LinkedList<FileStatus>());
// Test did get an optimistic lock
Mockito.verify(rqcMock, atLeast(5)).getQueuesZNodeCversion();
}

@Test
public void testReplicatorZnodeCversionChange()
throws KeeperException, NoSuchFieldException, IllegalAccessException {
Configuration conf = TEST_UTIL.getConfiguration();
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
cleaner.setConf(conf);

ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
// Avoid direct return because there no replicator.
Mockito.when(rqcMock.getListOfReplicators()).thenReturn(Lists.newArrayList("s1", "s2"));
Mockito.when(rqcMock.getReplicatorsZNodeCversion()).thenReturn(
ImmutableMap.of("s1", 0, "s2", 0),
ImmutableMap.of("s1", 1, "s2", 1),
ImmutableMap.of("s1", 2, "s2", 2),
ImmutableMap.of("s1", 3, "s2", 3));

Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
rqc.setAccessible(true);

rqc.set(cleaner, rqcMock);

// This should return eventually when cversion stabilizes
cleaner.getDeletableFiles(new LinkedList<FileStatus>());
// Test did get an optimistic lock
Mockito.verify(rqcMock, atLeast(5)).getReplicatorsZNodeCversion();
}

@Test(timeout=10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ public void testIsPeerPath_ActualPeerPath() {
assertTrue(rqZK.isPeerPath(peerPath));
}

@Test
public void testZNodeCversion() throws ReplicationException, KeeperException {
rq1.init(server1);

assertTrue(rqc.getReplicatorsZNodeCversion().containsKey(server1));
}

static class DummyServer implements Server {
private String serverName;
private boolean isAborted = false;
Expand Down