Skip to content

Commit 5892e38

Browse files
committed
HBASE-29647 Restore preWALRestore and postWALRestore coprocessor hooks
1 parent d0b9478 commit 5892e38

File tree

8 files changed

+193
-4
lines changed

8 files changed

+193
-4
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,6 +1412,22 @@ default void postReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironme
14121412
RegionInfo info, Path edits) throws IOException {
14131413
}
14141414

1415+
/**
1416+
* Called before a {@link WALEdit} replayed for this region.
1417+
* @param ctx the environment provided by the region server
1418+
*/
1419+
default void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
1420+
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
1421+
}
1422+
1423+
/**
1424+
* Called after a {@link WALEdit} replayed for this region.
1425+
* @param ctx the environment provided by the region server
1426+
*/
1427+
default void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
1428+
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
1429+
}
1430+
14151431
/**
14161432
* Called before bulkLoadHFile. Users can create a StoreFile instance to access the contents of a
14171433
* HFile.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5752,6 +5752,15 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
57525752
currentReplaySeqId =
57535753
(key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId;
57545754

5755+
// Start coprocessor replay here. The coprocessor is for each WALEdit
5756+
// instead of a KeyValue.
5757+
if (coprocessorHost != null) {
5758+
status.setStatus("Running pre-WAL-restore hook in coprocessors");
5759+
if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
5760+
// if bypass this wal entry, ignore it ...
5761+
continue;
5762+
}
5763+
}
57555764
boolean checkRowWithinBoundary = false;
57565765
// Check this edit is for this region.
57575766
if (
@@ -5822,6 +5831,10 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
58225831
internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
58235832
FlushLifeCycleTracker.DUMMY);
58245833
}
5834+
5835+
if (coprocessorHost != null) {
5836+
coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
5837+
}
58255838
}
58265839

58275840
if (coprocessorHost != null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,6 +2114,7 @@ public ReplicateWALEntryResponse replay(final RpcController controller,
21142114
ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
21152115
? region.getCoprocessorHost()
21162116
: null; // do not invoke coprocessors if this is a secondary region replica
2117+
List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();
21172118

21182119
// Skip adding the edits to WAL if this is a secondary region replica
21192120
boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
@@ -2135,6 +2136,18 @@ public ReplicateWALEntryResponse replay(final RpcController controller,
21352136
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
21362137
List<MutationReplay> edits =
21372138
WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
2139+
if (coprocessorHost != null) {
2140+
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
2141+
// KeyValue.
2142+
if (
2143+
coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
2144+
walEntry.getSecond())
2145+
) {
2146+
// if bypass this log entry, ignore it ...
2147+
continue;
2148+
}
2149+
walEntries.add(walEntry);
2150+
}
21382151
if (edits != null && !edits.isEmpty()) {
21392152
// HBASE-17924
21402153
// sort to improve lock efficiency
@@ -2157,6 +2170,13 @@ public ReplicateWALEntryResponse replay(final RpcController controller,
21572170
if (wal != null) {
21582171
wal.sync();
21592172
}
2173+
2174+
if (coprocessorHost != null) {
2175+
for (Pair<WALKey, WALEdit> entry : walEntries) {
2176+
coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
2177+
entry.getSecond());
2178+
}
2179+
}
21602180
return ReplicateWALEntryResponse.newBuilder().build();
21612181
} catch (IOException ie) {
21622182
throw new ServiceException(ie);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,6 +1426,39 @@ public void call(RegionObserver observer) throws IOException {
14261426
});
14271427
}
14281428

1429+
/**
1430+
* Supports Coprocessor 'bypass'.
1431+
* @return true if default behavior should be bypassed, false otherwise
1432+
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with
1433+
* something that doesn't expose IntefaceAudience.Private classes.
1434+
*/
1435+
@Deprecated
1436+
public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1437+
throws IOException {
1438+
return execOperation(
1439+
coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
1440+
@Override
1441+
public void call(RegionObserver observer) throws IOException {
1442+
observer.preWALRestore(this, info, logKey, logEdit);
1443+
}
1444+
});
1445+
}
1446+
1447+
/**
1448+
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced with
1449+
* something that doesn't expose IntefaceAudience.Private classes.
1450+
*/
1451+
@Deprecated
1452+
public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
1453+
throws IOException {
1454+
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
1455+
@Override
1456+
public void call(RegionObserver observer) throws IOException {
1457+
observer.postWALRestore(this, info, logKey, logEdit);
1458+
}
1459+
});
1460+
}
1461+
14291462
/**
14301463
* @param familyPaths pairs of { CF, file path } submitted for bulk load
14311464
*/

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class SampleRegionWALCoprocessor
5353

5454
private boolean preWALWriteCalled = false;
5555
private boolean postWALWriteCalled = false;
56+
private boolean preWALRestoreCalled = false;
57+
private boolean postWALRestoreCalled = false;
5658
private boolean preWALRollCalled = false;
5759
private boolean postWALRollCalled = false;
5860
private boolean preReplayWALsCalled = false;
@@ -74,6 +76,8 @@ public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq,
7476
this.changedQualifier = chq;
7577
preWALWriteCalled = false;
7678
postWALWriteCalled = false;
79+
preWALRestoreCalled = false;
80+
postWALRestoreCalled = false;
7781
preWALRollCalled = false;
7882
postWALRollCalled = false;
7983
}
@@ -130,6 +134,15 @@ public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env
130134
}
131135
}
132136

137+
/**
138+
* Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed.
139+
*/
140+
@Override
141+
public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
142+
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
143+
preWALRestoreCalled = true;
144+
}
145+
133146
@Override
134147
public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx, Path oldPath,
135148
Path newPath) throws IOException {
@@ -142,6 +155,15 @@ public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx
142155
postWALRollCalled = true;
143156
}
144157

158+
/**
159+
* Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed.
160+
*/
161+
@Override
162+
public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
163+
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
164+
postWALRestoreCalled = true;
165+
}
166+
145167
@Override
146168
public void preReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
147169
RegionInfo info, Path edits) throws IOException {
@@ -162,6 +184,16 @@ public boolean isPostWALWriteCalled() {
162184
return postWALWriteCalled;
163185
}
164186

187+
public boolean isPreWALRestoreCalled() {
188+
LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPreWALRestoreCalled is called.");
189+
return preWALRestoreCalled;
190+
}
191+
192+
public boolean isPostWALRestoreCalled() {
193+
LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPostWALRestoreCalled is called.");
194+
return postWALRestoreCalled;
195+
}
196+
165197
public boolean isPreWALRollCalled() {
166198
return preWALRollCalled;
167199
}

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
128128
final AtomicInteger ctPostBatchMutate = new AtomicInteger(0);
129129
final AtomicInteger ctPreReplayWALs = new AtomicInteger(0);
130130
final AtomicInteger ctPostReplayWALs = new AtomicInteger(0);
131+
final AtomicInteger ctPreWALRestore = new AtomicInteger(0);
132+
final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
131133
final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
132134
final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0);
133135
final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
@@ -696,6 +698,24 @@ public void postReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironmen
696698
ctPostReplayWALs.incrementAndGet();
697699
}
698700

701+
@Override
702+
public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
703+
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
704+
String tableName = logKey.getTableName().getNameAsString();
705+
if (tableName.equals(TABLE_SKIPPED)) {
706+
// skip recovery of TABLE_SKIPPED for testing purpose
707+
env.bypass();
708+
return;
709+
}
710+
ctPreWALRestore.incrementAndGet();
711+
}
712+
713+
@Override
714+
public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
715+
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
716+
ctPostWALRestore.incrementAndGet();
717+
}
718+
699719
@Override
700720
public StoreFileReader preStoreFileReaderOpen(
701721
ObserverContext<? extends RegionCoprocessorEnvironment> ctx, FileSystem fs, Path p,
@@ -912,6 +932,14 @@ public boolean hadPostReplayWALs() {
912932
return ctPostReplayWALs.get() > 0;
913933
}
914934

935+
public boolean hadPreWALRestore() {
936+
return ctPreWALRestore.get() > 0;
937+
}
938+
939+
public boolean hadPostWALRestore() {
940+
return ctPostWALRestore.get() > 0;
941+
}
942+
915943
public boolean wasScannerNextCalled() {
916944
return ctPreScannerNext.get() > 0 && ctPostScannerNext.get() > 0;
917945
}
@@ -1024,6 +1052,14 @@ public int getCtPostReplayWALs() {
10241052
return ctPostReplayWALs.get();
10251053
}
10261054

1055+
public int getCtPreWALRestore() {
1056+
return ctPreWALRestore.get();
1057+
}
1058+
1059+
public int getCtPostWALRestore() {
1060+
return ctPostWALRestore.get();
1061+
}
1062+
10271063
public int getCtPreWALAppend() {
10281064
return ctPreWALAppend.get();
10291065
}

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -768,23 +768,60 @@ public void testRecovery() throws Exception {
768768
tableName, new Boolean[] { false, false, true, true, true, true, false });
769769

770770
verifyMethodResult(SimpleRegionObserver.class,
771-
new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPrePut", "getCtPostPut" },
772-
tableName, new Integer[] { 0, 0, 2, 2 });
771+
new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
772+
"getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
773+
tableName, new Integer[] { 0, 0, 0, 0, 2, 2 });
773774

774775
cluster.killRegionServer(rs1.getRegionServer().getServerName());
775776
Threads.sleep(1000); // Let the kill soak in.
776777
util.waitUntilAllRegionsAssigned(tableName);
777778
LOG.info("All regions assigned");
778779

779780
verifyMethodResult(SimpleRegionObserver.class,
780-
new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPrePut", "getCtPostPut" },
781-
tableName, new Integer[] { 1, 1, 0, 0 });
781+
new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
782+
"getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
783+
tableName, new Integer[] { 1, 1, 2, 2, 0, 0 });
782784
} finally {
783785
util.deleteTable(tableName);
784786
table.close();
785787
}
786788
}
787789

790+
@Test
791+
public void testPreWALRestoreSkip() throws Exception {
792+
LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
793+
TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
794+
Table table = util.createTable(tableName, new byte[][] { A, B, C });
795+
796+
try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
797+
JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
798+
ServerName sn2 = rs1.getRegionServer().getServerName();
799+
String regEN = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
800+
801+
util.getAdmin().move(Bytes.toBytes(regEN), sn2);
802+
while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
803+
Thread.sleep(100);
804+
}
805+
806+
Put put = new Put(ROW);
807+
put.addColumn(A, A, A);
808+
put.addColumn(B, B, B);
809+
put.addColumn(C, C, C);
810+
table.put(put);
811+
812+
cluster.killRegionServer(rs1.getRegionServer().getServerName());
813+
Threads.sleep(20000); // just to be sure that the kill has fully started.
814+
util.waitUntilAllRegionsAssigned(tableName);
815+
}
816+
817+
verifyMethodResult(SimpleRegionObserver.class,
818+
new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName,
819+
new Integer[] { 0, 0 });
820+
821+
util.deleteTable(tableName);
822+
table.close();
823+
}
824+
788825
// called from testPreWALAppendIsWrittenToWAL
789826
private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
790827
int expectedCalls = 0;

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,8 @@ public Void run() throws Exception {
357357
SampleRegionWALCoprocessor cp2 =
358358
region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class);
359359
assertNotNull(cp2);
360+
assertTrue(cp2.isPreWALRestoreCalled());
361+
assertTrue(cp2.isPostWALRestoreCalled());
360362
assertTrue(cp2.isPreReplayWALsCalled());
361363
assertTrue(cp2.isPostReplayWALsCalled());
362364
region.close();

0 commit comments

Comments
 (0)