Skip to content

Commit c8f57bf

Browse files
gjacoby126apurtell
authored andcommitted
HBASE-22623 - Add RegionObserver coprocessor hook for preWALAppend (#390)
Signed-off-by: Andrew Purtell <[email protected]>
1 parent 0136b98 commit c8f57bf

File tree

8 files changed

+225
-2
lines changed

8 files changed

+225
-2
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,4 +1104,16 @@ default DeleteTracker postInstantiateDeleteTracker(
11041104
throws IOException {
11051105
return delTracker;
11061106
}
1107+
1108+
/**
1109+
* Called just before the WAL Entry is appended to the WAL. Implementing this hook allows
1110+
* coprocessors to add extended attributes to the WALKey that then get persisted to the
1111+
* WAL, and are available to replication endpoints to use in processing WAL Entries.
1112+
* @param ctx the environment provided by the region server
1113+
* @param key the WALKey associated with a particular append to a WAL
1114+
*/
1115+
default void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx, WALKey key,
1116+
WALEdit edit)
1117+
throws IOException {
1118+
}
11071119
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7964,6 +7964,11 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
79647964
if (walEdit.isReplay()) {
79657965
walKey.setOrigLogSeqNum(origLogSeqNum);
79667966
}
7967+
//don't call the coproc hook for writes to the WAL caused by
7968+
//system lifecycle events like flushes or compactions
7969+
if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
7970+
this.coprocessorHost.preWALAppend(walKey, walEdit);
7971+
}
79677972
WriteEntry writeEntry = null;
79687973
try {
79697974
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,6 +1720,18 @@ public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
17201720
});
17211721
}
17221722

1723+
public void preWALAppend(WALKey key, WALEdit edit) throws IOException {
1724+
if (this.coprocEnvironments.isEmpty()){
1725+
return;
1726+
}
1727+
execOperation(new RegionObserverOperationWithoutResult() {
1728+
@Override
1729+
public void call(RegionObserver observer) throws IOException {
1730+
observer.preWALAppend(this, key, edit);
1731+
}
1732+
});
1733+
}
1734+
17231735
public Message preEndpointInvocation(final Service service, final String methodName,
17241736
Message request) throws IOException {
17251737
if (coprocEnvironments.isEmpty()) {

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
/**
3636
* Key for WAL Entry.
37-
* Read-only. No Setters. For limited audience such as Coprocessors.
3837
*/
3938
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION,
4039
HBaseInterfaceAudience.COPROC})
@@ -86,6 +85,13 @@ default long getNonce() {
8685
*/
8786
long getOrigLogSeqNum();
8887

88+
/**
89+
* Add a named String value to this WALKey to be persisted into the WAL
90+
* @param attributeKey Name of the attribute
91+
* @param attributeValue Value of the attribute
92+
*/
93+
void addExtendedAttribute(String attributeKey, byte[] attributeValue);
94+
8995
/**
9096
* Return a named String value injected into the WALKey during processing, such as by a
9197
* coprocessor

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,37 @@ public WALKeyImpl(final byte[] encodedRegionName,
195195
mvcc, null, null);
196196
}
197197

198+
/**
199+
* Copy constructor that takes in an existing WALKeyImpl plus some extended attributes.
200+
* Intended for coprocessors to add annotations to a system-generated WALKey
201+
* for persistence to the WAL.
202+
* @param key Key to be copied into this new key
203+
* @param extendedAttributes Extra attributes to copy into the new key
204+
*/
205+
public WALKeyImpl(WALKeyImpl key,
206+
Map<String, byte[]> extendedAttributes){
207+
init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
208+
key.getWriteTime(), key.getClusterIds(), key.getNonceGroup(), key.getNonce(),
209+
key.getMvcc(), key.getReplicationScopes(), extendedAttributes);
210+
211+
}
212+
213+
/**
214+
* Copy constructor that takes in an existing WALKey, the extra WALKeyImpl fields that the
215+
* parent interface is missing, plus some extended attributes. Intended
216+
* for coprocessors to add annotations to a system-generated WALKey for
217+
* persistence to the WAL.
218+
*/
219+
public WALKeyImpl(WALKey key,
220+
List<UUID> clusterIds,
221+
MultiVersionConcurrencyControl mvcc,
222+
final NavigableMap<byte[], Integer> replicationScopes,
223+
Map<String, byte[]> extendedAttributes){
224+
init(key.getEncodedRegionName(), key.getTableName(), key.getSequenceId(),
225+
key.getWriteTime(), clusterIds, key.getNonceGroup(), key.getNonce(),
226+
mvcc, replicationScopes, extendedAttributes);
227+
228+
}
198229
/**
199230
* Create the log key for writing to somewhere.
200231
* We maintain the tablename mainly for debugging purposes.
@@ -464,6 +495,14 @@ public UUID getOriginatingClusterId(){
464495
return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0);
465496
}
466497

498+
@Override
499+
public void addExtendedAttribute(String attributeKey, byte[] attributeValue){
500+
if (extendedAttributes == null){
501+
extendedAttributes = new HashMap<String, byte[]>();
502+
}
503+
extendedAttributes.put(attributeKey, attributeValue);
504+
}
505+
467506
@Override
468507
public byte[] getExtendedAttribute(String attributeKey){
469508
return extendedAttributes != null ? extendedAttributes.get(attributeKey) : null;

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.junit.Assert.assertTrue;
2626

2727
import java.io.IOException;
28+
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Optional;
@@ -124,7 +125,11 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
124125
final AtomicInteger ctPostStartRegionOperation = new AtomicInteger(0);
125126
final AtomicInteger ctPostCloseRegionOperation = new AtomicInteger(0);
126127
final AtomicBoolean throwOnPostFlush = new AtomicBoolean(false);
128+
final AtomicInteger ctPreWALAppend = new AtomicInteger(0);
129+
127130
static final String TABLE_SKIPPED = "SKIPPED_BY_PREWALRESTORE";
131+
Map<String, byte[]> extendedAttributes = new HashMap<String,byte[]>();
132+
static final byte[] WAL_EXTENDED_ATTRIBUTE_BYTES = Bytes.toBytes("foo");
128133

129134
public void setThrowOnPostFlush(Boolean val){
130135
throwOnPostFlush.set(val);
@@ -631,6 +636,15 @@ public StoreFileReader postStoreFileReaderOpen(ObserverContext<RegionCoprocessor
631636
return reader;
632637
}
633638

639+
@Override
640+
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
641+
WALKey key, WALEdit edit) throws IOException {
642+
ctPreWALAppend.incrementAndGet();
643+
644+
key.addExtendedAttribute(Integer.toString(ctPreWALAppend.get()),
645+
Bytes.toBytes("foo"));
646+
}
647+
634648
public boolean hadPreGet() {
635649
return ctPreGet.get() > 0;
636650
}
@@ -864,6 +878,10 @@ public int getCtPostWALRestore() {
864878
return ctPostWALRestore.get();
865879
}
866880

881+
public int getCtPreWALAppend() {
882+
return ctPreWALAppend.get();
883+
}
884+
867885
public boolean wasStoreFileReaderOpenCalled() {
868886
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
869887
}

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.IOException;
2626
import java.lang.reflect.Method;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.List;
2930
import java.util.Optional;
3031
import org.apache.hadoop.conf.Configuration;
@@ -43,6 +44,7 @@
4344
import org.apache.hadoop.hbase.TableName;
4445
import org.apache.hadoop.hbase.client.Admin;
4546
import org.apache.hadoop.hbase.client.Append;
47+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
4648
import org.apache.hadoop.hbase.client.Delete;
4749
import org.apache.hadoop.hbase.client.Durability;
4850
import org.apache.hadoop.hbase.client.Get;
@@ -55,6 +57,8 @@
5557
import org.apache.hadoop.hbase.client.RowMutations;
5658
import org.apache.hadoop.hbase.client.Scan;
5759
import org.apache.hadoop.hbase.client.Table;
60+
import org.apache.hadoop.hbase.client.TableDescriptor;
61+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
5862
import org.apache.hadoop.hbase.filter.FilterAllFilter;
5963
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
6064
import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -70,20 +74,26 @@
7074
import org.apache.hadoop.hbase.regionserver.StoreFile;
7175
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
7276
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
77+
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
7378
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
7479
import org.apache.hadoop.hbase.testclassification.MediumTests;
7580
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
7681
import org.apache.hadoop.hbase.util.Bytes;
7782
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
7883
import org.apache.hadoop.hbase.util.JVMClusterUtil;
7984
import org.apache.hadoop.hbase.util.Threads;
85+
import org.apache.hadoop.hbase.wal.WALEdit;
86+
import org.apache.hadoop.hbase.wal.WALKey;
87+
import org.apache.hadoop.hbase.wal.WALKeyImpl;
8088
import org.junit.AfterClass;
89+
import org.junit.Assert;
8190
import org.junit.BeforeClass;
8291
import org.junit.ClassRule;
8392
import org.junit.Rule;
8493
import org.junit.Test;
8594
import org.junit.experimental.categories.Category;
8695
import org.junit.rules.TestName;
96+
import org.mockito.Mockito;
8797
import org.slf4j.Logger;
8898
import org.slf4j.LoggerFactory;
8999

@@ -99,6 +109,7 @@ public class TestRegionObserverInterface {
99109
private static final Logger LOG = LoggerFactory.getLogger(TestRegionObserverInterface.class);
100110

101111
public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
112+
public static final byte[] FAMILY = Bytes.toBytes("f");
102113
public final static byte[] A = Bytes.toBytes("a");
103114
public final static byte[] B = Bytes.toBytes("b");
104115
public final static byte[] C = Bytes.toBytes("c");
@@ -663,6 +674,97 @@ public void testPreWALRestoreSkip() throws Exception {
663674
table.close();
664675
}
665676

677+
//called from testPreWALAppendIsWrittenToWAL
678+
private void testPreWALAppendHook(Table table, TableName tableName) throws IOException {
679+
int expectedCalls = 0;
680+
String [] methodArray = new String[1];
681+
methodArray[0] = "getCtPreWALAppend";
682+
Object[] resultArray = new Object[1];
683+
684+
Put p = new Put(ROW);
685+
p.addColumn(A, A, A);
686+
table.put(p);
687+
resultArray[0] = ++expectedCalls;
688+
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
689+
690+
Append a = new Append(ROW);
691+
a.addColumn(B, B, B);
692+
table.append(a);
693+
resultArray[0] = ++expectedCalls;
694+
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
695+
696+
Increment i = new Increment(ROW);
697+
i.addColumn(C, C, 1);
698+
table.increment(i);
699+
resultArray[0] = ++expectedCalls;
700+
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
701+
702+
Delete d = new Delete(ROW);
703+
table.delete(d);
704+
resultArray[0] = ++expectedCalls;
705+
verifyMethodResult(SimpleRegionObserver.class, methodArray, tableName, resultArray);
706+
}
707+
708+
@Test
709+
public void testPreWALAppend() throws Exception {
710+
SimpleRegionObserver sro = new SimpleRegionObserver();
711+
ObserverContext ctx = Mockito.mock(ObserverContext.class);
712+
WALKey key = new WALKeyImpl(Bytes.toBytes("region"), TEST_TABLE,
713+
EnvironmentEdgeManager.currentTime());
714+
WALEdit edit = new WALEdit();
715+
sro.preWALAppend(ctx, key, edit);
716+
Assert.assertEquals(1, key.getExtendedAttributes().size());
717+
Assert.assertArrayEquals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
718+
key.getExtendedAttribute(Integer.toString(sro.getCtPreWALAppend())));
719+
}
720+
721+
@Test
722+
public void testPreWALAppendIsWrittenToWAL() throws Exception {
723+
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
724+
"." + name.getMethodName());
725+
Table table = util.createTable(tableName, new byte[][] { A, B, C });
726+
727+
PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
728+
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
729+
//should be only one region
730+
HRegion region = regions.get(0);
731+
region.getWAL().registerWALActionsListener(listener);
732+
testPreWALAppendHook(table, tableName);
733+
boolean[] expectedResults = {true, true, true, true};
734+
Assert.assertArrayEquals(expectedResults, listener.getWalKeysCorrectArray());
735+
736+
}
737+
738+
@Test
739+
public void testPreWALAppendNotCalledOnMetaEdit() throws Exception {
740+
final TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() +
741+
"." + name.getMethodName());
742+
TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
743+
ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
744+
tdBuilder.setColumnFamily(cfBuilder.build());
745+
tdBuilder.setCoprocessor(SimpleRegionObserver.class.getName());
746+
TableDescriptor td = tdBuilder.build();
747+
Table table = util.createTable(td, new byte[][] { A, B, C });
748+
749+
PreWALAppendWALActionsListener listener = new PreWALAppendWALActionsListener();
750+
List<HRegion> regions = util.getHBaseCluster().getRegions(tableName);
751+
//should be only one region
752+
HRegion region = regions.get(0);
753+
754+
region.getWAL().registerWALActionsListener(listener);
755+
//flushing should write to the WAL
756+
region.flush(true);
757+
//so should compaction
758+
region.compact(false);
759+
//and so should closing the region
760+
region.close();
761+
762+
//but we still shouldn't have triggered preWALAppend because no user data was written
763+
String[] methods = new String[] {"getCtPreWALAppend"};
764+
Object[] expectedResult = new Integer[]{0};
765+
verifyMethodResult(SimpleRegionObserver.class, methods, tableName, expectedResult);
766+
}
767+
666768
// check each region whether the coprocessor upcalls are called or not.
667769
private void verifyMethodResult(Class<?> coprocessor, String methodName[], TableName tableName,
668770
Object value[]) throws IOException {
@@ -711,4 +813,23 @@ private static void createHFile(Configuration conf, FileSystem fs, Path path, by
711813
writer.close();
712814
}
713815
}
816+
817+
private static class PreWALAppendWALActionsListener implements WALActionsListener {
818+
boolean[] walKeysCorrect = {false, false, false, false};
819+
820+
@Override
821+
public void postAppend(long entryLen, long elapsedTimeMillis,
822+
WALKey logKey, WALEdit logEdit) throws IOException {
823+
for (int k = 0; k < 4; k++) {
824+
if (!walKeysCorrect[k]) {
825+
walKeysCorrect[k] = Arrays.equals(SimpleRegionObserver.WAL_EXTENDED_ATTRIBUTE_BYTES,
826+
logKey.getExtendedAttribute(Integer.toString(k + 1)));
827+
}
828+
}
829+
}
830+
831+
boolean[] getWalKeysCorrectArray() {
832+
return walKeysCorrect;
833+
}
834+
}
714835
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.apache.hadoop.hbase.Waiter;
105105
import org.apache.hadoop.hbase.client.Append;
106106
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
107+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
107108
import org.apache.hadoop.hbase.client.Delete;
108109
import org.apache.hadoop.hbase.client.Durability;
109110
import org.apache.hadoop.hbase.client.Get;
@@ -165,7 +166,6 @@
165166
import org.apache.hadoop.hbase.wal.WALProvider;
166167
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
167168
import org.apache.hadoop.hbase.wal.WALSplitUtil;
168-
import org.apache.hadoop.hbase.wal.WALSplitter;
169169
import org.apache.hadoop.metrics2.MetricsExecutor;
170170
import org.junit.After;
171171
import org.junit.Assert;
@@ -401,6 +401,7 @@ public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOExcep
401401
String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
402402
FileSystem fs = FileSystem.get(CONF);
403403
Path rootDir = new Path(dir + testName);
404+
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
404405
FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
405406
hLog.init();
406407
region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
@@ -2427,7 +2428,16 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
24272428
return null;
24282429
}
24292430
}).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2431+
ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder.
2432+
newBuilder(COLUMN_FAMILY_BYTES);
2433+
ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE,
2434+
Long.MAX_VALUE, region.getCellComparator());
2435+
Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class),
2436+
Mockito.any())).thenReturn(info);
2437+
Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class),
2438+
Mockito.any())).thenAnswer(i -> i.getArgument(1));
24302439
region.setCoprocessorHost(mockedCPHost);
2440+
24312441
region.put(originalPut);
24322442
region.setCoprocessorHost(normalCPHost);
24332443
final long finalSize = region.getDataInMemoryWithoutWAL();

0 commit comments

Comments
 (0)