diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java index e5f28d2e0602..6cc2b5adf9d4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -93,4 +94,11 @@ default CompletableFuture mutate(Mutation mutation) { default long getPeriodicalFlushTimeout(TimeUnit unit) { throw new UnsupportedOperationException("Not implemented"); } + + /** + * Returns the rpc request attributes. + */ + default Map getRequestAttributes() { + return Collections.emptyMap(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java index ed21fb8e23ef..d38aa625fb2b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.yetus.audience.InterfaceAudience; @@ -38,6 +39,16 @@ public interface AsyncBufferedMutatorBuilder { */ AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit); + /** + * Set a rpc request attribute. + */ + AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value); + + /** + * Set multiple rpc request attributes. + */ + AsyncBufferedMutatorBuilder setRequestAttributes(Map requestAttributes); + /** * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java index ede5b359e833..6905ff3065cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.yetus.audience.InterfaceAudience; @@ -78,6 +79,20 @@ public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) { return this; } + @Override + public AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value) { + tableBuilder.setRequestAttribute(key, value); + return this; + } + + @Override + public AsyncBufferedMutatorBuilder setRequestAttributes(Map requestAttributes) { + for (Map.Entry requestAttribute : requestAttributes.entrySet()) { + tableBuilder.setRequestAttribute(requestAttribute.getKey(), requestAttribute.getValue()); + } + return this; + } + @Override public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) { Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0", diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index ce4193d91382..3acd8bebdada 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -130,7 +131,7 @@ Stream.> generate(CompletableFuture::new).limit(mutation periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> { synchronized (AsyncBufferedMutatorImpl.this) { // confirm that we are still valid, if there is already an internalFlush call before us, - // then we should not execute any more. And in internalFlush we will set periodicFlush + // then we should not execute anymore. And in internalFlush we will set periodicFlush // to null, and since we may schedule a new one, so here we check whether the references // are equal. if (timeout == periodicFlushTask) { @@ -170,4 +171,9 @@ public long getWriteBufferSize() { public long getPeriodicalFlushTimeout(TimeUnit unit) { return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); } + + @Override + public Map getRequestAttributes() { + return table.getRequestAttributes(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 2979c6689884..f14eac3cf79a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -21,12 +21,12 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; import static org.apache.hadoop.hbase.util.FutureUtils.allOf; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; @@ -117,7 +117,7 @@ public interface AsyncTable { * @return a map of request attributes supplied by the client */ default Map getRequestAttributes() { - throw new NotImplementedException("Add an implementation!"); + return Collections.emptyMap(); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 02e9da0770b4..428e7358195e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -129,10 +129,10 @@ public AsyncTableBuilderBase setStartLogErrorsCnt(int startLogErrorsCnt) { @Override public AsyncTableBuilder setRequestAttribute(String key, byte[] value) { - if (this.requestAttributes.isEmpty()) { - this.requestAttributes = new HashMap<>(); + if (requestAttributes.isEmpty()) { + requestAttributes = new HashMap<>(); } - this.requestAttributes.put(key, value); + requestAttributes.put(key, value); return this; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index f790f5a4e2f5..24563367bbbc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -19,7 +19,9 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -204,6 +206,13 @@ default void setOperationTimeout(int timeout) { "The BufferedMutator::setOperationTimeout has not been implemented"); } + /** + * Returns the rpc request attributes. + */ + default Map getRequestAttributes() { + return Collections.emptyMap(); + } + /** * Listens for asynchronous exceptions on a {@link BufferedMutator}. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java index 72692eac59e5..aec4a0cbf216 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -186,4 +187,9 @@ public void setRpcTimeout(int timeout) { public void setOperationTimeout(int timeout) { // no effect } + + @Override + public Map getRequestAttributes() { + return mutator.getRequestAttributes(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index b3efa14fa7ee..44bc5e2be7cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -38,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable { private String implementationClassName = null; private int rpcTimeout = UNSET; private int operationTimeout = UNSET; + protected Map requestAttributes = Collections.emptyMap(); private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { @Override public void onException(RetriesExhaustedWithDetailsException exception, @@ -85,6 +89,18 @@ public int getOperationTimeout() { return operationTimeout; } + public BufferedMutatorParams setRequestAttribute(String key, byte[] value) { + if (requestAttributes.isEmpty()) { + requestAttributes = new HashMap<>(); + } + requestAttributes.put(key, value); + return this; + } + + public Map getRequestAttributes() { + return requestAttributes; + } + /** * Override the write buffer size specified by the provided {@link Connection}'s * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index 30c348e6d1f1..d299e453266e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -107,6 +107,10 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) { builder.setMaxKeyValueSize(params.getMaxKeyValueSize()); } + if (!params.getRequestAttributes().isEmpty()) { + + builder.setRequestAttributes(params.getRequestAttributes()); + } return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 342cf89acf1a..257a5788b183 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -214,8 +214,8 @@ private SingleRequestCallerBuilder newCaller(byte[] row, int priority, lo .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).setRequestAttributes(requestAttributes) - .startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes); + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .setRequestAttributes(requestAttributes); } private SingleRequestCallerBuilder diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 3941c0d18540..907e3d1a7040 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -757,6 +757,6 @@ default long getOperationTimeout(TimeUnit unit) { * @return map of request attributes */ default Map getRequestAttributes() { - throw new NotImplementedException("Add an implementation!"); + return Collections.emptyMap(); } } diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java index 1c4447eb6598..1835d51d0386 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -27,7 +29,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; @@ -67,12 +71,19 @@ public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator } } }; - BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener); + + BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener) + .setRequestAttribute("requestInfo", Bytes.toBytes("bar")); // // step 1: create a single Connection and a BufferedMutator, shared by all worker threads. // - try (final Connection conn = ConnectionFactory.createConnection(getConf()); + Map connectionAttributes = new HashMap<>(); + connectionAttributes.put("clientId", Bytes.toBytes("foo")); + Configuration conf = getConf(); + try ( + final Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), connectionAttributes); final BufferedMutator mutator = conn.getBufferedMutator(params)) { /** worker pool that operates on BufferedTable instances */ @@ -104,6 +115,7 @@ public Void call() throws Exception { f.get(5, TimeUnit.MINUTES); } workerPool.shutdown(); + mutator.flush(); } catch (IOException e) { // exception while creating/destroying Connection or BufferedMutator LOG.info("exception while creating/destroying Connection or BufferedMutator", e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionAttributes.java new file mode 100644 index 000000000000..49c7e86975d1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionAttributes.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestConnectionAttributes { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestConnectionAttributes.class); + + private static final Map CONNECTION_ATTRIBUTES = new HashMap<>(); + static { + CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); + } + private static final byte[] FAMILY = Bytes.toBytes("0"); + private static final TableName TABLE_NAME = TableName.valueOf("testConnectionAttributes"); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static SingleProcessHBaseCluster cluster; + + @BeforeClass + public static void setUp() throws Exception { + cluster = TEST_UTIL.startMiniCluster(1); + Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1, + HConstants.DEFAULT_BLOCKSIZE, TestConnectionAttributes.AttributesCoprocessor.class.getName()); + table.close(); + } + + @AfterClass + public static void afterClass() throws Exception { + cluster.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(TABLE_NAME)) { + + // submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection + // header + byte[] bytes = new byte[300]; + new Random().nextBytes(bytes); + Result result = table.get(new Get(bytes)); + + assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); + for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { + byte[] val = result.getValue(FAMILY, Bytes.toBytes(attr.getKey())); + assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); + } + } + } + + public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + RpcCall rpcCall = RpcServer.getCurrentCall().get(); + for (Map.Entry attr : rpcCall.getConnectionAttributes().entrySet()) { + result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) + .setFamily(FAMILY).setQualifier(Bytes.toBytes(attr.getKey())).setValue(attr.getValue()) + .setType(Cell.Type.Put).setTimestamp(1).build()); + } + result.sort(CellComparator.getInstance()); + c.bypass(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java deleted file mode 100644 index 728b877a32b4..000000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.AuthUtil; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.ipc.RpcCall; -import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WALEdit; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; - -@Category({ ClientTests.class, MediumTests.class }) -public class TestRequestAndConnectionAttributes { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestRequestAndConnectionAttributes.class); - - private static final Map CONNECTION_ATTRIBUTES = new HashMap<>(); - static { - CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); - } - private static final Map REQUEST_ATTRIBUTES = new HashMap<>(); - private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); - private static final AtomicBoolean REQUEST_ATTRIBUTES_VALIDATED = new AtomicBoolean(false); - private static final byte[] REQUEST_ATTRIBUTES_TEST_TABLE_CF = Bytes.toBytes("0"); - private static final TableName REQUEST_ATTRIBUTES_TEST_TABLE = - TableName.valueOf("testRequestAttributes"); - - private static HBaseTestingUtil TEST_UTIL = null; - - @BeforeClass - public static void setUp() throws Exception { - TEST_UTIL = new HBaseTestingUtil(); - TEST_UTIL.startMiniCluster(1); - TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE, - new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE, - AttributesCoprocessor.class.getName()); - } - - @AfterClass - public static void afterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Before - public void setup() { - REQUEST_ATTRIBUTES_VALIDATED.getAndSet(false); - } - - @Test - public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException { - TableName tableName = TableName.valueOf("testConnectionAttributes"); - byte[] cf = Bytes.toBytes("0"); - TEST_UTIL.createTable(tableName, new byte[][] { cf }, 1, HConstants.DEFAULT_BLOCKSIZE, - AttributesCoprocessor.class.getName()); - - Configuration conf = TEST_UTIL.getConfiguration(); - try (Connection conn = ConnectionFactory.createConnection(conf, null, - AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { - - // submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection - // header - byte[] bytes = new byte[300]; - new Random().nextBytes(bytes); - Result result = table.get(new Get(bytes)); - - assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); - for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { - byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey())); - assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); - } - } - } - - @Test - public void testRequestAttributesGet() throws IOException { - addRandomRequestAttributes(); - - Configuration conf = TEST_UTIL.getConfiguration(); - try ( - Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), - CONNECTION_ATTRIBUTES); - Table table = configureRequestAttributes( - conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { - - table.get(new Get(Bytes.toBytes(0))); - } - - assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); - } - - @Test - public void testRequestAttributesMultiGet() throws IOException { - assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); - addRandomRequestAttributes(); - - Configuration conf = TEST_UTIL.getConfiguration(); - try ( - Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), - CONNECTION_ATTRIBUTES); - Table table = configureRequestAttributes( - conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { - List gets = ImmutableList.of(new Get(Bytes.toBytes(0)), new Get(Bytes.toBytes(1))); - table.get(gets); - } - - assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); - } - - @Test - public void testRequestAttributesExists() throws IOException { - assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); - addRandomRequestAttributes(); - - Configuration conf = TEST_UTIL.getConfiguration(); - try ( - Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), - CONNECTION_ATTRIBUTES); - Table table = configureRequestAttributes( - conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { - - table.exists(new Get(Bytes.toBytes(0))); - } - - assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); - } - - @Test - public void testRequestAttributesScan() throws IOException { - assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); - addRandomRequestAttributes(); - - Configuration conf = TEST_UTIL.getConfiguration(); - try ( - Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), - CONNECTION_ATTRIBUTES); - Table table = configureRequestAttributes( - conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { - ResultScanner scanner = table.getScanner(new Scan()); - scanner.next(); - } - assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); - } - - @Test - public void testRequestAttributesPut() throws IOException { - assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); - addRandomRequestAttributes(); - - Configuration conf = TEST_UTIL.getConfiguration(); - try ( - Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), - CONNECTION_ATTRIBUTES); - Table table = configureRequestAttributes( - conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { - Put put = new Put(Bytes.toBytes("a")); - put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); - table.put(put); - } - assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); - } - - @Test - public void testRequestAttributesMultiPut() throws IOException { - assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); - addRandomRequestAttributes(); - - Configuration conf = TEST_UTIL.getConfiguration(); - try ( - Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), - CONNECTION_ATTRIBUTES); - Table table = configureRequestAttributes( - conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { - Put put = new Put(Bytes.toBytes("a")); - put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); - table.put(put); - } - assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); - } - - @Test - public void testNoRequestAttributes() throws IOException { - assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); - TableName tableName = TableName.valueOf("testNoRequestAttributesScan"); - TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, - HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); - - REQUEST_ATTRIBUTES.clear(); - Configuration conf = TEST_UTIL.getConfiguration(); - try (Connection conn = ConnectionFactory.createConnection(conf, null, - AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { - TableBuilder tableBuilder = conn.getTableBuilder(tableName, null); - try (Table table = tableBuilder.build()) { - table.get(new Get(Bytes.toBytes(0))); - assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); - } - } - } - - private void addRandomRequestAttributes() { - REQUEST_ATTRIBUTES.clear(); - int j = Math.max(2, (int) (10 * Math.random())); - for (int i = 0; i < j; i++) { - REQUEST_ATTRIBUTES.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString())); - } - } - - private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder) { - REQUEST_ATTRIBUTES.forEach(tableBuilder::setRequestAttribute); - return tableBuilder; - } - - public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { - - @Override - public Optional getRegionObserver() { - return Optional.of(this); - } - - @Override - public void preGetOp(ObserverContext c, Get get, - List result) throws IOException { - validateRequestAttributes(); - - // for connection attrs test - RpcCall rpcCall = RpcServer.getCurrentCall().get(); - for (Map.Entry attr : rpcCall.getRequestAttributes().entrySet()) { - result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) - .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getKey())) - .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build()); - } - for (Map.Entry attr : rpcCall.getConnectionAttributes().entrySet()) { - result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) - .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getKey())) - .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build()); - } - result.sort(CellComparator.getInstance()); - c.bypass(); - } - - @Override - public boolean preScannerNext(ObserverContext c, - InternalScanner s, List result, int limit, boolean hasNext) throws IOException { - validateRequestAttributes(); - return hasNext; - } - - @Override - public void prePut(ObserverContext c, Put put, WALEdit edit) - throws IOException { - validateRequestAttributes(); - } - - private void validateRequestAttributes() { - RpcCall rpcCall = RpcServer.getCurrentCall().get(); - Map attrs = rpcCall.getRequestAttributes(); - if (attrs.size() != REQUEST_ATTRIBUTES.size()) { - return; - } - for (Map.Entry attr : attrs.entrySet()) { - if (!REQUEST_ATTRIBUTES.containsKey(attr.getKey())) { - return; - } - if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getKey()), attr.getValue())) { - return; - } - } - REQUEST_ATTRIBUTES_VALIDATED.getAndSet(true); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java new file mode 100644 index 000000000000..66486ebcdd20 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.ExtendedCellScannable; +import org.apache.hadoop.hbase.ExtendedCellScanner; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestRequestAttributes { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRequestAttributes.class); + + private static final byte[] ROW_KEY1 = Bytes.toBytes("1"); + private static final byte[] ROW_KEY2A = Bytes.toBytes("2A"); + private static final byte[] ROW_KEY2B = Bytes.toBytes("2B"); + private static final byte[] ROW_KEY3 = Bytes.toBytes("3"); + private static final byte[] ROW_KEY4 = Bytes.toBytes("4"); + private static final byte[] ROW_KEY5 = Bytes.toBytes("5"); + private static final byte[] ROW_KEY6 = Bytes.toBytes("6"); + private static final byte[] ROW_KEY7 = Bytes.toBytes("7"); + private static final byte[] ROW_KEY8 = Bytes.toBytes("8"); + private static final Map CONNECTION_ATTRIBUTES = new HashMap<>(); + private static final Map REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes(); + private static final Map> ROW_KEY_TO_REQUEST_ATTRIBUTES = + new HashMap<>(); + static { + CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY1, addRandomRequestAttributes()); + Map requestAttributes2 = addRandomRequestAttributes(); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2A, requestAttributes2); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY2B, requestAttributes2); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY3, addRandomRequestAttributes()); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY4, addRandomRequestAttributes()); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY5, addRandomRequestAttributes()); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes()); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes()); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<>()); + } + private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); + private static final byte[] FAMILY = Bytes.toBytes("0"); + private static final TableName TABLE_NAME = TableName.valueOf("testRequestAttributes"); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static SingleProcessHBaseCluster cluster; + + @BeforeClass + public static void setUp() throws Exception { + cluster = TEST_UTIL.startMiniCluster(1); + Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + table.close(); + } + + @AfterClass + public static void afterClass() throws Exception { + cluster.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRequestAttributesGet() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), + ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY1)).build()) { + + table.get(new Get(ROW_KEY1)); + } + } + + @Test + public void testRequestAttributesMultiGet() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), + ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY2A)).build()) { + List gets = List.of(new Get(ROW_KEY2A), new Get(ROW_KEY2B)); + table.get(gets); + } + } + + @Test + public void testRequestAttributesScan() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), + REQUEST_ATTRIBUTES_SCAN).build()) { + ResultScanner scanner = table.getScanner(new Scan()); + scanner.next(); + } + } + + @Test + public void testRequestAttributesPut() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), + ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY3)).build()) { + Put put = new Put(ROW_KEY3); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + table.put(put); + } + } + + @Test + public void testRequestAttributesMultiPut() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), + ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY4)).build()) { + Put put1 = new Put(ROW_KEY4); + put1.addColumn(FAMILY, Bytes.toBytes("c1"), Bytes.toBytes("v1")); + Put put2 = new Put(ROW_KEY4); + put2.addColumn(FAMILY, Bytes.toBytes("c2"), Bytes.toBytes("v2")); + table.put(List.of(put1, put2)); + } + } + + @Test + public void testRequestAttributesBufferedMutate() throws IOException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + BufferedMutator bufferedMutator = + conn.getBufferedMutator(configureRequestAttributes(new BufferedMutatorParams(TABLE_NAME), + ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY5)));) { + Put put = new Put(ROW_KEY5); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + bufferedMutator.mutate(put); + bufferedMutator.flush(); + } + } + + @Test + public void testRequestAttributesExists() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = configureRequestAttributes(conn.getTableBuilder(TABLE_NAME, EXECUTOR_SERVICE), + ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY6)).build()) { + + table.exists(new Get(ROW_KEY6)); + } + } + + @Test + public void testRequestAttributesFromRpcController() throws IOException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + RequestMetadataControllerFactory.class, RpcControllerFactory.class); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + BufferedMutator bufferedMutator = conn.getBufferedMutator(TABLE_NAME);) { + Put put = new Put(ROW_KEY7); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + bufferedMutator.mutate(put); + bufferedMutator.flush(); + } + conf.unset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY); + } + + @Test + public void testNoRequestAttributes() throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { + TableBuilder tableBuilder = conn.getTableBuilder(TABLE_NAME, null); + try (Table table = tableBuilder.build()) { + table.get(new Get(ROW_KEY8)); + } + } + } + + private static Map addRandomRequestAttributes() { + Map requestAttributes = new HashMap<>(); + int j = Math.max(2, (int) (10 * Math.random())); + for (int i = 0; i < j; i++) { + requestAttributes.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString())); + } + return requestAttributes; + } + + private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder, + Map requestAttributes) { + requestAttributes.forEach(tableBuilder::setRequestAttribute); + return tableBuilder; + } + + private static BufferedMutatorParams configureRequestAttributes(BufferedMutatorParams params, + Map requestAttributes) { + requestAttributes.forEach(params::setRequestAttribute); + return params; + } + + public static class RequestMetadataControllerFactory extends RpcControllerFactory { + + public RequestMetadataControllerFactory(Configuration conf) { + super(conf); + } + + @Override + public HBaseRpcController newController() { + return new RequestMetadataController(super.newController()); + } + + @Override + public HBaseRpcController newController(ExtendedCellScanner cellScanner) { + return new RequestMetadataController(super.newController(null, cellScanner)); + } + + @Override + public HBaseRpcController newController(RegionInfo regionInfo, + ExtendedCellScanner cellScanner) { + return new RequestMetadataController(super.newController(regionInfo, cellScanner)); + } + + @Override + public HBaseRpcController newController(final List cellIterables) { + return new RequestMetadataController(super.newController(null, cellIterables)); + } + + @Override + public HBaseRpcController newController(RegionInfo regionInfo, + final List cellIterables) { + return new RequestMetadataController(super.newController(regionInfo, cellIterables)); + } + + public static class RequestMetadataController extends DelegatingHBaseRpcController { + private final Map requestAttributes; + + RequestMetadataController(HBaseRpcController delegate) { + super(delegate); + this.requestAttributes = ROW_KEY_TO_REQUEST_ATTRIBUTES.get(ROW_KEY7); + } + + @Override + public Map getRequestAttributes() { + return requestAttributes; + } + } + } + + public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + validateRequestAttributes(getRequestAttributesForRowKey(get.getRow())); + } + + @Override + public boolean preScannerNext(ObserverContext c, + InternalScanner s, List result, int limit, boolean hasNext) throws IOException { + validateRequestAttributes(REQUEST_ATTRIBUTES_SCAN); + return hasNext; + } + + @Override + public void prePut(ObserverContext c, Put put, WALEdit edit) + throws IOException { + validateRequestAttributes(getRequestAttributesForRowKey(put.getRow())); + } + + private Map getRequestAttributesForRowKey(byte[] rowKey) { + for (byte[] byteArray : ROW_KEY_TO_REQUEST_ATTRIBUTES.keySet()) { + if (Arrays.equals(byteArray, rowKey)) { + return ROW_KEY_TO_REQUEST_ATTRIBUTES.get(byteArray); + } + } + return null; + } + + private void validateRequestAttributes(Map requestAttributes) { + RpcCall rpcCall = RpcServer.getCurrentCall().get(); + Map attrs = rpcCall.getRequestAttributes(); + if (attrs.size() != requestAttributes.size()) { + return; + } + for (Map.Entry attr : attrs.entrySet()) { + if (!requestAttributes.containsKey(attr.getKey())) { + return; + } + if (!Arrays.equals(requestAttributes.get(attr.getKey()), attr.getValue())) { + return; + } + } + } + } +}