From 57115f7af51fd9a006531c5b84bc04652bfff207 Mon Sep 17 00:00:00 2001 From: Istvan Toth Date: Wed, 8 Oct 2025 13:12:41 +0200 Subject: [PATCH 1/2] HBASE-29647 Restore preWALRestore and postWALRestore coprocessor hooks (cherry picked from commit 5892e38a99b773da4d1355c7dde07f5c19dac063) --- .../hbase/coprocessor/RegionObserver.java | 16 +++++++ .../hadoop/hbase/regionserver/HRegion.java | 13 ++++++ .../hbase/regionserver/RSRpcServices.java | 20 +++++++++ .../regionserver/RegionCoprocessorHost.java | 33 ++++++++++++++ .../SampleRegionWALCoprocessor.java | 32 +++++++++++++ .../coprocessor/SimpleRegionObserver.java | 36 +++++++++++++++ .../TestRegionObserverInterface.java | 45 +++++++++++++++++-- .../hbase/coprocessor/TestWALObserver.java | 2 + 8 files changed, 193 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 65fe524d0a49..7b7f7e208f51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -1412,6 +1412,22 @@ default void postReplayWALs(ObserverContext ctx, + RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + } + + /** + * Called after a {@link WALEdit} replayed for this region. + * @param ctx the environment provided by the region server + */ + default void postWALRestore(ObserverContext ctx, + RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + } + /** * Called before bulkLoadHFile. Users can create a StoreFile instance to access the contents of a * HFile. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7936197ff8d8..9b7daee0f668 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5752,6 +5752,15 @@ private long replayRecoveredEdits(final Path edits, Map maxSeqIdIn currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId; + // Start coprocessor replay here. The coprocessor is for each WALEdit + // instead of a KeyValue. + if (coprocessorHost != null) { + status.setStatus("Running pre-WAL-restore hook in coprocessors"); + if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) { + // if bypass this wal entry, ignore it ... + continue; + } + } boolean checkRowWithinBoundary = false; // Check this edit is for this region. if ( @@ -5822,6 +5831,10 @@ private long replayRecoveredEdits(final Path edits, Map maxSeqIdIn internalFlushcache(null, currentEditSeqId, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); } + + if (coprocessorHost != null) { + coprocessorHost.postWALRestore(this.getRegionInfo(), key, val); + } } if (coprocessorHost != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 11d5917dda65..fdfea375e096 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2114,6 +2114,7 @@ public ReplicateWALEntryResponse replay(final RpcController controller, ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo()) ? region.getCoprocessorHost() : null; // do not invoke coprocessors if this is a secondary region replica + List> walEntries = new ArrayList<>(); // Skip adding the edits to WAL if this is a secondary region replica boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); @@ -2135,6 +2136,18 @@ public ReplicateWALEntryResponse replay(final RpcController controller, Pair walEntry = (coprocessorHost == null) ? null : new Pair<>(); List edits = WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); + if (coprocessorHost != null) { + // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a + // KeyValue. + if ( + coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(), + walEntry.getSecond()) + ) { + // if bypass this log entry, ignore it ... + continue; + } + walEntries.add(walEntry); + } if (edits != null && !edits.isEmpty()) { // HBASE-17924 // sort to improve lock efficiency @@ -2157,6 +2170,13 @@ public ReplicateWALEntryResponse replay(final RpcController controller, if (wal != null) { wal.sync(); } + + if (coprocessorHost != null) { + for (Pair entry : walEntries) { + coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(), + entry.getSecond()); + } + } return ReplicateWALEntryResponse.newBuilder().build(); } catch (IOException ie) { throw new ServiceException(ie); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 703f06141bf4..1d6960fdc554 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1426,6 +1426,39 @@ public void call(RegionObserver observer) throws IOException { }); } + /** + * Supports Coprocessor 'bypass'. + * @return true if default behavior should be bypassed, false otherwise + * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with + * something that doesn't expose IntefaceAudience.Private classes. + */ + @Deprecated + public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) + throws IOException { + return execOperation( + coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) { + @Override + public void call(RegionObserver observer) throws IOException { + observer.preWALRestore(this, info, logKey, logEdit); + } + }); + } + + /** + * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with + * something that doesn't expose IntefaceAudience.Private classes. + */ + @Deprecated + public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) + throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { + @Override + public void call(RegionObserver observer) throws IOException { + observer.postWALRestore(this, info, logKey, logEdit); + } + }); + } + /** * @param familyPaths pairs of { CF, file path } submitted for bulk load */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java index 17ab26c6a58d..8d6d363daa64 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java @@ -53,6 +53,8 @@ public class SampleRegionWALCoprocessor private boolean preWALWriteCalled = false; private boolean postWALWriteCalled = false; + private boolean preWALRestoreCalled = false; + private boolean postWALRestoreCalled = false; private boolean preWALRollCalled = false; private boolean postWALRollCalled = false; private boolean preReplayWALsCalled = false; @@ -74,6 +76,8 @@ public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq, this.changedQualifier = chq; preWALWriteCalled = false; postWALWriteCalled = false; + preWALRestoreCalled = false; + postWALRestoreCalled = false; preWALRollCalled = false; postWALRollCalled = false; } @@ -130,6 +134,15 @@ public void preWALWrite(ObserverContext env } } + /** + * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed. + */ + @Override + public void preWALRestore(ObserverContext env, + RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + preWALRestoreCalled = true; + } + @Override public void preWALRoll(ObserverContext ctx, Path oldPath, Path newPath) throws IOException { @@ -142,6 +155,15 @@ public void postWALRoll(ObserverContext ctx postWALRollCalled = true; } + /** + * Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed. + */ + @Override + public void postWALRestore(ObserverContext env, + RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + postWALRestoreCalled = true; + } + @Override public void preReplayWALs(ObserverContext ctx, RegionInfo info, Path edits) throws IOException { @@ -162,6 +184,16 @@ public boolean isPostWALWriteCalled() { return postWALWriteCalled; } + public boolean isPreWALRestoreCalled() { + LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPreWALRestoreCalled is called."); + return preWALRestoreCalled; + } + + public boolean isPostWALRestoreCalled() { + LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPostWALRestoreCalled is called."); + return postWALRestoreCalled; + } + public boolean isPreWALRollCalled() { return preWALRollCalled; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 42ddf84b8774..ec32a1d2c4dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -128,6 +128,8 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { final AtomicInteger ctPostBatchMutate = new AtomicInteger(0); final AtomicInteger ctPreReplayWALs = new AtomicInteger(0); final AtomicInteger ctPostReplayWALs = new AtomicInteger(0); + final AtomicInteger ctPreWALRestore = new AtomicInteger(0); + final AtomicInteger ctPostWALRestore = new AtomicInteger(0); final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0); final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0); final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0); @@ -696,6 +698,24 @@ public void postReplayWALs(ObserverContext env, + RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + String tableName = logKey.getTableName().getNameAsString(); + if (tableName.equals(TABLE_SKIPPED)) { + // skip recovery of TABLE_SKIPPED for testing purpose + env.bypass(); + return; + } + ctPreWALRestore.incrementAndGet(); + } + + @Override + public void postWALRestore(ObserverContext env, + RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + ctPostWALRestore.incrementAndGet(); + } + @Override public StoreFileReader preStoreFileReaderOpen( ObserverContext ctx, FileSystem fs, Path p, @@ -912,6 +932,14 @@ public boolean hadPostReplayWALs() { return ctPostReplayWALs.get() > 0; } + public boolean hadPreWALRestore() { + return ctPreWALRestore.get() > 0; + } + + public boolean hadPostWALRestore() { + return ctPostWALRestore.get() > 0; + } + public boolean wasScannerNextCalled() { return ctPreScannerNext.get() > 0 && ctPostScannerNext.get() > 0; } @@ -1024,6 +1052,14 @@ public int getCtPostReplayWALs() { return ctPostReplayWALs.get(); } + public int getCtPreWALRestore() { + return ctPreWALRestore.get(); + } + + public int getCtPostWALRestore() { + return ctPostWALRestore.get(); + } + public int getCtPreWALAppend() { return ctPreWALAppend.get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 81b516595717..df57add1708e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -768,8 +768,9 @@ public void testRecovery() throws Exception { tableName, new Boolean[] { false, false, true, true, true, true, false }); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPrePut", "getCtPostPut" }, - tableName, new Integer[] { 0, 0, 2, 2 }); + new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore", + "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" }, + tableName, new Integer[] { 0, 0, 0, 0, 2, 2 }); cluster.killRegionServer(rs1.getRegionServer().getServerName()); Threads.sleep(1000); // Let the kill soak in. @@ -777,14 +778,50 @@ public void testRecovery() throws Exception { LOG.info("All regions assigned"); verifyMethodResult(SimpleRegionObserver.class, - new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPrePut", "getCtPostPut" }, - tableName, new Integer[] { 1, 1, 0, 0 }); + new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore", + "getCtPostWALRestore", "getCtPrePut", "getCtPostPut" }, + tableName, new Integer[] { 1, 1, 2, 2, 0, 0 }); } finally { util.deleteTable(tableName); table.close(); } } + @Test + public void testPreWALRestoreSkip() throws Exception { + LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName()); + TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED); + Table table = util.createTable(tableName, new byte[][] { A, B, C }); + + try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) { + JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); + ServerName sn2 = rs1.getRegionServer().getServerName(); + String regEN = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); + + util.getAdmin().move(Bytes.toBytes(regEN), sn2); + while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) { + Thread.sleep(100); + } + + Put put = new Put(ROW); + put.addColumn(A, A, A); + put.addColumn(B, B, B); + put.addColumn(C, C, C); + table.put(put); + + cluster.killRegionServer(rs1.getRegionServer().getServerName()); + Threads.sleep(20000); // just to be sure that the kill has fully started. + util.waitUntilAllRegionsAssigned(tableName); + } + + verifyMethodResult(SimpleRegionObserver.class, + new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName, + new Integer[] { 0, 0 }); + + util.deleteTable(tableName); + table.close(); + } + // called from testPreWALAppendIsWrittenToWAL private void testPreWALAppendHook(Table table, TableName tableName) throws IOException { int expectedCalls = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index 989110e41d97..6e7c6ff400a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -357,6 +357,8 @@ public Void run() throws Exception { SampleRegionWALCoprocessor cp2 = region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class); assertNotNull(cp2); + assertTrue(cp2.isPreWALRestoreCalled()); + assertTrue(cp2.isPostWALRestoreCalled()); assertTrue(cp2.isPreReplayWALsCalled()); assertTrue(cp2.isPostReplayWALsCalled()); region.close(); From 4cacd25fca61fced58bcc594817ea0f42e13d37c Mon Sep 17 00:00:00 2001 From: Istvan Toth Date: Wed, 8 Oct 2025 14:59:52 +0200 Subject: [PATCH 2/2] un-deprecate the RegionCoprocessorHost methods --- .../hadoop/hbase/regionserver/RegionCoprocessorHost.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 1d6960fdc554..b300496e1d7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1429,10 +1429,7 @@ public void call(RegionObserver observer) throws IOException { /** * Supports Coprocessor 'bypass'. * @return true if default behavior should be bypassed, false otherwise - * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with - * something that doesn't expose IntefaceAudience.Private classes. */ - @Deprecated public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { return execOperation( @@ -1444,11 +1441,6 @@ public void call(RegionObserver observer) throws IOException { }); } - /** - * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with - * something that doesn't expose IntefaceAudience.Private classes. - */ - @Deprecated public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {