diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java index 1984436019a5..3cda94a1c028 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/SystemTableWALEntryFilter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,20 +17,16 @@ */ package org.apache.hadoop.hbase.replication; - -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; /** - * Skips WAL edits for all System tables including META + * Skips WAL edits for all System tables including hbase:meta. */ @InterfaceAudience.Private public class SystemTableWALEntryFilter implements WALEntryFilter { @Override public Entry filter(Entry entry) { - if (entry.getKey().getTableName().isSystemTable()) { - return null; - } - return entry; + return entry.getKey().getTableName().isSystemTable()? null: entry; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java index cd3f1bdfb062..23c1c60f2db1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,10 +16,9 @@ * limitations under the License. */ package org.apache.hadoop.hbase.replication; - -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; /** * A Filter for WAL entries before being sent over to replication. Multiple @@ -34,7 +33,6 @@ */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public interface WALEntryFilter { - /** *
* Applies the filter, possibly returning a different Entry instance. If null is returned, the
@@ -49,5 +47,5 @@ public interface WALEntryFilter {
* @return a (possibly modified) Entry to use. Returning null or an entry with no cells will cause
* the entry to be skipped for replication.
*/
- public Entry filter(Entry entry);
+ Entry filter(Entry entry);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index c283b5c706ba..23b70296c225 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,12 +18,12 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -63,15 +64,15 @@
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
* Class that handles the source of a replication stream.
- * Currently does not handle more than 1 slave
+ * Currently does not handle more than 1 slave cluster.
* For each slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
@@ -120,8 +121,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;
- // A filter (or a chain of filters) for the WAL entries.
+
+ /**
+ * A filter (or a chain of filters) for WAL entries; filters out edits.
+ */
protected volatile WALEntryFilter walEntryFilter;
+
// throttler
private ReplicationThrottler throttler;
private long defaultBandwidth;
@@ -139,6 +144,39 @@ public class ReplicationSource implements ReplicationSourceInterface {
private Thread initThread;
+ /**
+ * WALs to replicate.
+ * Predicate that returns 'true' for WALs to replicate and false for WALs to skip.
+ */
+ private final Predicatehbase/oldWALs. If you config hbase.separate.oldlogdir.by.regionserver to
* true, it looks like hbase//oldWALs/kalashnikov.att.net,61634,1486865297088.
- * @param conf
* @param serverName Server name formatted as described in {@link ServerName}
* @return the relative WAL directory name
*/
@@ -414,11 +412,11 @@ public static boolean isMetaFile(Path p) {
return isMetaFile(p.getName());
}
+ /**
+ * @return True if String ends in {@link #META_WAL_PROVIDER_ID}
+ */
public static boolean isMetaFile(String p) {
- if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) {
- return true;
- }
- return false;
+ return p != null && p.endsWith(META_WAL_PROVIDER_ID);
}
public static boolean isArchivedLogFile(Path p) {
@@ -461,12 +459,9 @@ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOEx
* @param path path to WAL file
* @param conf configuration
* @return WAL Reader instance
- * @throws IOException
*/
public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf)
- throws IOException
-
- {
+ throws IOException {
long retryInterval = 2000; // 2 sec
int maxAttempts = 30;
int attempt = 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 274ccabfbea3..6323da3e3e92 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,13 +16,14 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
-
+import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.OptionalLong;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -41,19 +42,14 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -63,7 +59,6 @@
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -96,9 +91,13 @@ public static void setUpBeforeClass() throws Exception {
FS = TEST_UTIL.getDFSCluster().getFileSystem();
Path rootDir = TEST_UTIL.createRootDir();
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
+ if (FS.exists(oldLogDir)) {
+ FS.delete(oldLogDir, true);
+ }
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
- if (FS.exists(logDir)) FS.delete(logDir, true);
+ if (FS.exists(logDir)) {
+ FS.delete(logDir, true);
+ }
}
@AfterClass
@@ -108,16 +107,100 @@ public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniDFSCluster();
}
+ /**
+ * Test the default ReplicationSource skips queuing hbase:meta WAL files.
+ */
+ @Test
+ public void testDefaultSkipsMetaWAL() throws IOException {
+ ReplicationSource rs = new ReplicationSource();
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setInt("replication.source.maxretriesmultiplier", 1);
+ ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+ Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+ Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+ ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
+ Mockito.when(peerConfig.getReplicationEndpointImpl()).
+ thenReturn(DoNothingReplicationEndpoint.class.getName());
+ Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+ ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+ Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+ String queueId = "qid";
+ RegionServerServices rss =
+ TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+ rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+ p -> OptionalLong.empty(), new MetricsSource(queueId));
+ try {
+ rs.startup();
+ assertTrue(rs.isSourceActive());
+ assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
+ rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
+ assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
+ rs.enqueueLog(new Path("a.1"));
+ assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
+ } finally {
+ rs.terminate("Done");
+ rss.stop("Done");
+ }
+ }
+
+ /**
+ * Test that we filter out meta edits, etc.
+ */
+ @Test
+ public void testWALEntryFilter() throws IOException {
+ // To get the fully constructed default WALEntryFilter, need to create a ReplicationSource
+ // instance and init it.
+ ReplicationSource rs = new ReplicationSource();
+ UUID uuid = UUID.randomUUID();
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+ Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+ Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+ ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
+ Mockito.when(peerConfig.getReplicationEndpointImpl()).
+ thenReturn(DoNothingReplicationEndpoint.class.getName());
+ Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+ ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+ Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+ String queueId = "qid";
+ RegionServerServices rss =
+ TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+ rs.init(conf, null, manager, null, mockPeer, rss, queueId,
+ uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
+ try {
+ rs.startup();
+ TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
+ WALEntryFilter wef = rs.getWalEntryFilter();
+ // Test non-system WAL edit.
+ WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY,
+ TableName.valueOf("test"), -1), new WALEdit());
+ assertTrue(wef.filter(e) == e);
+ // Test system WAL edit.
+ e = new WAL.Entry(
+ new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TableName.META_TABLE_NAME, -1),
+ new WALEdit());
+ assertNull(wef.filter(e));
+ } finally {
+ rs.terminate("Done");
+ rss.stop("Done");
+ }
+ }
+
/**
* Sanity check that we can move logs around while we are reading
* from them. Should this test fail, ReplicationSource would have a hard
* time reading logs that are being archived.
*/
+ // This tests doesn't belong in here... it is not about ReplicationSource.
@Test
public void testLogMoving() throws Exception{
Path logPath = new Path(logDir, "log");
- if (!FS.exists(logDir)) FS.mkdirs(logDir);
- if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
+ if (!FS.exists(logDir)) {
+ FS.mkdirs(logDir);
+ }
+ if (!FS.exists(oldLogDir)) {
+ FS.mkdirs(oldLogDir);
+ }
WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
TEST_UTIL.getConfiguration());
for(int i = 0; i < 3; i++) {
@@ -142,7 +225,7 @@ public void testLogMoving() throws Exception{
entry = reader.next();
assertNotNull(entry);
- entry = reader.next();
+ reader.next();
entry = reader.next();
assertNull(entry);
@@ -151,47 +234,31 @@ public void testLogMoving() throws Exception{
/**
* Tests that {@link ReplicationSource#terminate(String)} will timeout properly
+ * Moved here from TestReplicationSource because doesn't need cluster.
*/
@Test
public void testTerminateTimeout() throws Exception {
ReplicationSource source = new ReplicationSource();
- ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
- @Override
- protected void doStart() {
- notifyStarted();
- }
-
- @Override
- protected void doStop() {
- // not calling notifyStopped() here causes the caller of stop() to get a Future that never
- // completes
- }
- };
- replicationEndpoint.start();
- ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
- Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
- Configuration testConf = HBaseConfiguration.create();
- testConf.setInt("replication.source.maxretriesmultiplier", 1);
- ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
- Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
- source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
- p -> OptionalLong.empty(), null);
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future> future = executor.submit(new Runnable() {
-
- @Override
- public void run() {
- source.terminate("testing source termination");
- }
- });
- long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
- Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate