diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index bd5b7736f3b9..26360cbe3ea1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -75,6 +76,8 @@ class ReplicationSourceWALReader extends Thread { private boolean isReaderRunning = true; private final String walGroupId; + AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false); + /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -137,8 +140,11 @@ public void run() { source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { + waitingPeerEnabled.set(true); Threads.sleep(sleepForRetries); continue; + } else { + waitingPeerEnabled.set(false); } if (!checkBufferQuota()) { continue; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index c6268674c5b8..22bf05b37419 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.OptionalLong; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; @@ -219,6 +220,11 @@ private HasNext prepareReader() { // we will read from the beginning so we should always clear the compression context reader.resetTo(-1, true); } + } catch (FileNotFoundException e) { + // For now, this could happen only when reading meta wal for meta replicas. + // In this case, raising UncheckedIOException will let the endpoint deal with resetting + // the replication source. See HBASE-27871. + throw new UncheckedIOException(e); } catch (IOException e) { LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, currentPositionOfEntry, state.resetCompression(), e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java index 7ca651dbe770..d3e644ff4330 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java @@ -29,6 +29,8 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; @@ -225,6 +228,51 @@ public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Excepti } } + @Test + public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception { + TableName tableName = TableName.valueOf("hbase:meta"); + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Table table = connection.getTable(tableName)) { + MiniHBaseCluster cluster = HTU.getHBaseCluster(); + cluster.getMaster().balanceSwitch(false); + HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta()); + ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService() + .getReplicationManager().catalogReplicationSource.get(); + ((ReplicationPeerImpl) source.replicationPeer).setPeerState(false); + // there's small chance source reader has passed the peer state check but not yet read the + // wal, which could allow it to read some added entries before the wal gets deleted, + // so we are making sure here we only proceed once the reader loop has managed to + // detect the peer is disabled. + HTU.waitFor(2000, 100, true, () -> { + MutableObject readerWaiting = new MutableObject<>(true); + source.logQueue.getQueues().keySet() + .forEach(w -> readerWaiting.setValue(readerWaiting.getValue() + && source.workerThreads.get(w).entryReader.waitingPeerEnabled.get())); + return readerWaiting.getValue(); + }); + // load the data to the table + for (int i = 0; i < 5; i++) { + LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); + HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); + LOG.info("flushing table"); + HTU.flush(tableName); + LOG.info("compacting table"); + if (i < 4) { + HTU.compact(tableName, false); + } + } + HTU.getHBaseCluster().getMaster().getLogCleaner().triggerCleanerNow().get(1, + TimeUnit.SECONDS); + ((ReplicationPeerImpl) source.replicationPeer).setPeerState(true); + // now loads more data without flushing nor compacting + for (int i = 5; i < 10; i++) { + LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000)); + HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000); + } + verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY); + } + } + @Test public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception { MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();