Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,22 @@ default void postReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironme
RegionInfo info, Path edits) throws IOException {
}

/**
* Called before a {@link WALEdit} replayed for this region.
* @param ctx the environment provided by the region server
*/
default void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
}

/**
* Called after a {@link WALEdit} replayed for this region.
* @param ctx the environment provided by the region server
*/
default void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
}

/**
* Called before bulkLoadHFile. Users can create a StoreFile instance to access the contents of a
* HFile.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5752,6 +5752,15 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
currentReplaySeqId =
(key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId;

// Start coprocessor replay here. The coprocessor is for each WALEdit
// instead of a KeyValue.
if (coprocessorHost != null) {
status.setStatus("Running pre-WAL-restore hook in coprocessors");
if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
// if bypass this wal entry, ignore it ...
continue;
}
}
boolean checkRowWithinBoundary = false;
// Check this edit is for this region.
if (
Expand Down Expand Up @@ -5822,6 +5831,10 @@ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdIn
internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
FlushLifeCycleTracker.DUMMY);
}

if (coprocessorHost != null) {
coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
}
}

if (coprocessorHost != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2114,6 +2114,7 @@ public ReplicateWALEntryResponse replay(final RpcController controller,
ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
? region.getCoprocessorHost()
: null; // do not invoke coprocessors if this is a secondary region replica
List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<>();

// Skip adding the edits to WAL if this is a secondary region replica
boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
Expand All @@ -2135,6 +2136,18 @@ public ReplicateWALEntryResponse replay(final RpcController controller,
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
List<MutationReplay> edits =
WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.
if (
coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
walEntry.getSecond())
) {
// if bypass this log entry, ignore it ...
continue;
}
walEntries.add(walEntry);
}
if (edits != null && !edits.isEmpty()) {
// HBASE-17924
// sort to improve lock efficiency
Expand All @@ -2157,6 +2170,13 @@ public ReplicateWALEntryResponse replay(final RpcController controller,
if (wal != null) {
wal.sync();
}

if (coprocessorHost != null) {
for (Pair<WALKey, WALEdit> entry : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
entry.getSecond());
}
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,31 @@ public void call(RegionObserver observer) throws IOException {
});
}

/**
* Supports Coprocessor 'bypass'.
* @return true if default behavior should be bypassed, false otherwise
*/
public boolean preWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
return execOperation(
coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult(true) {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preWALRestore(this, info, logKey, logEdit);
}
});
}

public void postWALRestore(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postWALRestore(this, info, logKey, logEdit);
}
});
}

/**
* @param familyPaths pairs of { CF, file path } submitted for bulk load
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class SampleRegionWALCoprocessor

private boolean preWALWriteCalled = false;
private boolean postWALWriteCalled = false;
private boolean preWALRestoreCalled = false;
private boolean postWALRestoreCalled = false;
private boolean preWALRollCalled = false;
private boolean postWALRollCalled = false;
private boolean preReplayWALsCalled = false;
Expand All @@ -74,6 +76,8 @@ public void setTestValues(byte[] tableName, byte[] row, byte[] igf, byte[] igq,
this.changedQualifier = chq;
preWALWriteCalled = false;
postWALWriteCalled = false;
preWALRestoreCalled = false;
postWALRestoreCalled = false;
preWALRollCalled = false;
postWALRollCalled = false;
}
Expand Down Expand Up @@ -130,6 +134,15 @@ public void preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env
}
}

/**
* Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed.
*/
@Override
public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
preWALRestoreCalled = true;
}

@Override
public void preWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx, Path oldPath,
Path newPath) throws IOException {
Expand All @@ -142,6 +155,15 @@ public void postWALRoll(ObserverContext<? extends WALCoprocessorEnvironment> ctx
postWALRollCalled = true;
}

/**
* Triggered after {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is Restoreed.
*/
@Override
public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
postWALRestoreCalled = true;
}

@Override
public void preReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, Path edits) throws IOException {
Expand All @@ -162,6 +184,16 @@ public boolean isPostWALWriteCalled() {
return postWALWriteCalled;
}

public boolean isPreWALRestoreCalled() {
LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPreWALRestoreCalled is called.");
return preWALRestoreCalled;
}

public boolean isPostWALRestoreCalled() {
LOG.debug(SampleRegionWALCoprocessor.class.getName() + ".isPostWALRestoreCalled is called.");
return postWALRestoreCalled;
}

public boolean isPreWALRollCalled() {
return preWALRollCalled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPostBatchMutate = new AtomicInteger(0);
final AtomicInteger ctPreReplayWALs = new AtomicInteger(0);
final AtomicInteger ctPostReplayWALs = new AtomicInteger(0);
final AtomicInteger ctPreWALRestore = new AtomicInteger(0);
final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
final AtomicInteger ctPostStoreFileReaderOpen = new AtomicInteger(0);
final AtomicInteger ctPostBatchMutateIndispensably = new AtomicInteger(0);
Expand Down Expand Up @@ -696,6 +698,24 @@ public void postReplayWALs(ObserverContext<? extends RegionCoprocessorEnvironmen
ctPostReplayWALs.incrementAndGet();
}

@Override
public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
String tableName = logKey.getTableName().getNameAsString();
if (tableName.equals(TABLE_SKIPPED)) {
// skip recovery of TABLE_SKIPPED for testing purpose
env.bypass();
return;
}
ctPreWALRestore.incrementAndGet();
}

@Override
public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
ctPostWALRestore.incrementAndGet();
}

@Override
public StoreFileReader preStoreFileReaderOpen(
ObserverContext<? extends RegionCoprocessorEnvironment> ctx, FileSystem fs, Path p,
Expand Down Expand Up @@ -912,6 +932,14 @@ public boolean hadPostReplayWALs() {
return ctPostReplayWALs.get() > 0;
}

public boolean hadPreWALRestore() {
return ctPreWALRestore.get() > 0;
}

public boolean hadPostWALRestore() {
return ctPostWALRestore.get() > 0;
}

public boolean wasScannerNextCalled() {
return ctPreScannerNext.get() > 0 && ctPostScannerNext.get() > 0;
}
Expand Down Expand Up @@ -1024,6 +1052,14 @@ public int getCtPostReplayWALs() {
return ctPostReplayWALs.get();
}

public int getCtPreWALRestore() {
return ctPreWALRestore.get();
}

public int getCtPostWALRestore() {
return ctPostWALRestore.get();
}

public int getCtPreWALAppend() {
return ctPreWALAppend.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,23 +768,60 @@ public void testRecovery() throws Exception {
tableName, new Boolean[] { false, false, true, true, true, true, false });

verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPrePut", "getCtPostPut" },
tableName, new Integer[] { 0, 0, 2, 2 });
new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
"getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
tableName, new Integer[] { 0, 0, 0, 0, 2, 2 });

cluster.killRegionServer(rs1.getRegionServer().getServerName());
Threads.sleep(1000); // Let the kill soak in.
util.waitUntilAllRegionsAssigned(tableName);
LOG.info("All regions assigned");

verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPrePut", "getCtPostPut" },
tableName, new Integer[] { 1, 1, 0, 0 });
new String[] { "getCtPreReplayWALs", "getCtPostReplayWALs", "getCtPreWALRestore",
"getCtPostWALRestore", "getCtPrePut", "getCtPostPut" },
tableName, new Integer[] { 1, 1, 2, 2, 0, 0 });
} finally {
util.deleteTable(tableName);
table.close();
}
}

@Test
public void testPreWALRestoreSkip() throws Exception {
LOG.info(TestRegionObserverInterface.class.getName() + "." + name.getMethodName());
TableName tableName = TableName.valueOf(SimpleRegionObserver.TABLE_SKIPPED);
Table table = util.createTable(tableName, new byte[][] { A, B, C });

try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) {
JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
ServerName sn2 = rs1.getRegionServer().getServerName();
String regEN = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();

util.getAdmin().move(Bytes.toBytes(regEN), sn2);
while (!sn2.equals(locator.getAllRegionLocations().get(0).getServerName())) {
Thread.sleep(100);
}

Put put = new Put(ROW);
put.addColumn(A, A, A);
put.addColumn(B, B, B);
put.addColumn(C, C, C);
table.put(put);

cluster.killRegionServer(rs1.getRegionServer().getServerName());
Threads.sleep(20000); // just to be sure that the kill has fully started.
util.waitUntilAllRegionsAssigned(tableName);
}

verifyMethodResult(SimpleRegionObserver.class,
new String[] { "getCtPreWALRestore", "getCtPostWALRestore", }, tableName,
new Integer[] { 0, 0 });

util.deleteTable(tableName);
table.close();
}

// called from testPreWALAppendIsWrittenToWAL
private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
int expectedCalls = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ public Void run() throws Exception {
SampleRegionWALCoprocessor cp2 =
region.getCoprocessorHost().findCoprocessor(SampleRegionWALCoprocessor.class);
assertNotNull(cp2);
assertTrue(cp2.isPreWALRestoreCalled());
assertTrue(cp2.isPostWALRestoreCalled());
assertTrue(cp2.isPreReplayWALsCalled());
assertTrue(cp2.isPostReplayWALsCalled());
region.close();
Expand Down