From a2d67c3e18137c42324f68ec00f035109915a984 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 1 Mar 2023 21:16:14 -0500 Subject: [PATCH 01/12] Connection and Request Attributes --- .../hbase/client/AsyncConnectionImpl.java | 10 +- .../hbase/client/ConnectionFactory.java | 66 +++++- .../hadoop/hbase/ipc/AbstractRpcClient.java | 7 +- .../hadoop/hbase/ipc/BlockingRpcClient.java | 7 +- .../hbase/ipc/BlockingRpcConnection.java | 2 +- .../org/apache/hadoop/hbase/ipc/Call.java | 7 +- .../ipc/DelegatingHBaseRpcController.java | 6 + .../hadoop/hbase/ipc/HBaseRpcController.java | 3 + .../hbase/ipc/HBaseRpcControllerImpl.java | 6 + .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 11 + .../hadoop/hbase/ipc/NettyRpcClient.java | 10 +- .../hadoop/hbase/ipc/NettyRpcConnection.java | 2 +- .../hadoop/hbase/ipc/RpcClientFactory.java | 12 +- .../hadoop/hbase/ipc/RpcConnection.java | 15 +- .../mapreduce/TestHFileOutputFormat2.java | 7 +- .../TestMultiTableInputFormatBase.java | 3 +- .../mapreduce/TestTableInputFormatBase.java | 4 +- .../src/main/protobuf/rpc/RPC.proto | 2 + .../client/AsyncClusterConnectionImpl.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcCall.java | 3 + .../apache/hadoop/hbase/ipc/ServerCall.java | 7 + .../hbase/client/TestClientTimeouts.java | 2 +- .../TestRequestAndConnectionAttributes.java | 222 ++++++++++++++++++ .../hadoop/hbase/ipc/TestRpcClientLeaks.java | 2 +- .../ipc/TestRpcServerSlowConnectionSetup.java | 2 +- .../namequeues/TestNamedQueueRecorder.java | 5 + .../region/TestRegionProcedureStore.java | 5 + .../thrift2/client/ThriftConnection.java | 4 +- 28 files changed, 397 insertions(+), 37 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 3af574cfc0b2..8c7223d7be20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -31,6 +31,7 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.net.SocketAddress; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -127,6 +128,11 @@ public class AsyncConnectionImpl implements AsyncConnection { public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) { + this(conf, registry, clusterId, localAddress, user, null); + } + + public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, + SocketAddress localAddress, User user, Map connectionAttributes) { this.conf = conf; this.user = user; this.metricsScope = MetricsConnection.getScope(conf, clusterId, this); @@ -142,8 +148,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri } else { this.metrics = Optional.empty(); } - this.rpcClient = - RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null)); + this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, + metrics.orElse(null), connectionAttributes); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 4d4559f4b7a9..531b7911f0dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.security.PrivilegedExceptionAction; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; @@ -216,21 +217,53 @@ public static Connection createConnection(Configuration conf, User user) throws */ public static Connection createConnection(Configuration conf, ExecutorService pool, final User user) throws IOException { + return createConnection(conf, pool, user, null); + } + + /** + * Create a new Connection instance using the passed conf instance. Connection + * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces + * created from returned connection share zookeeper connection, meta cache, and connections to + * region servers and masters.
+ * The caller is responsible for calling {@link Connection#close()} on the returned connection + * instance. Typical usage: + * + *
+   * Connection connection = ConnectionFactory.createConnection(conf);
+   * Table table = connection.getTable(TableName.valueOf("table1"));
+   * try {
+   *   table.get(...);
+   *   ...
+   * } finally {
+   *   table.close();
+   *   connection.close();
+   * }
+   * 
+ * + * @param conf configuration + * @param user the user the connection is for + * @param pool the thread pool to use for batch operations + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return Connection object for conf + */ + public static Connection createConnection(Configuration conf, ExecutorService pool, + final User user, Map connectionAttributes) throws IOException { Class clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, ConnectionOverAsyncConnection.class, Connection.class); if (clazz != ConnectionOverAsyncConnection.class) { try { // Default HCM#HCI is not accessible; make it so before invoking. - Constructor constructor = - clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class); + Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, + ExecutorService.class, User.class, Map.class); constructor.setAccessible(true); - return user.runAs((PrivilegedExceptionAction< - Connection>) () -> (Connection) constructor.newInstance(conf, pool, user)); + return user.runAs((PrivilegedExceptionAction) () -> (Connection) constructor + .newInstance(conf, pool, user, connectionAttributes)); } catch (Exception e) { throw new IOException(e); } } else { - return FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); + return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)) + .toConnection(); } } @@ -281,6 +314,27 @@ public static CompletableFuture createAsyncConnection(Configura */ public static CompletableFuture createAsyncConnection(Configuration conf, final User user) { + return createAsyncConnection(conf, user, null); + } + + /** + * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. + * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and + * interfaces created from returned connection share zookeeper connection, meta cache, and + * connections to region servers and masters. + *

+ * The caller is responsible for calling {@link AsyncConnection#close()} on the returned + * connection instance. + *

+ * Usually you should only create one AsyncConnection instance in your code and use it everywhere + * as it is thread safe. + * @param conf configuration + * @param user the user the asynchronous connection is for + * @param connectionAttributes attributes to be sent along to server during connection establish + * @return AsyncConnection object wrapped by CompletableFuture + */ + public static CompletableFuture createAsyncConnection(Configuration conf, + final User user, Map connectionAttributes) { return TraceUtil.tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); @@ -300,7 +354,7 @@ public static CompletableFuture createAsyncConnection(Configura try { future.complete( user.runAs((PrivilegedExceptionAction) () -> ReflectionUtils - .newInstance(clazz, conf, registry, clusterId, null, user))); + .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes))); } catch (Exception e) { registry.close(); future.completeExceptionally(e); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 23d14c272d2b..9e7f1a5b502e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.Collection; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -106,6 +107,7 @@ public abstract class AbstractRpcClient implements RpcC private boolean running = true; // if client runs protected final Configuration conf; + protected final Map connectionAttributes; protected final String clusterId; protected final SocketAddress localAddr; protected final MetricsConnection metrics; @@ -154,7 +156,7 @@ public AtomicInteger load(Address key) throws Exception { * @param metrics the connection metrics */ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { + MetricsConnection metrics, Map connectionAttributes) { this.userProvider = UserProvider.instantiate(conf); this.localAddr = localAddr; this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); @@ -167,6 +169,7 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes this.conf = conf; + this.connectionAttributes = connectionAttributes; this.codec = getCodec(); this.compressor = getCompressor(conf); this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, @@ -417,7 +420,7 @@ private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcCon final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, - hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { + hrc.getCallTimeout(), hrc.getPriority(), hrc.getAttributes(), new RpcCallback() { @Override public void run(Call call) { try (Scope scope = call.span.makeCurrent()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java index 7fffdad935fc..0b48089658c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.Map; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -41,7 +42,7 @@ public class BlockingRpcClient extends AbstractRpcClient * SocketFactory */ BlockingRpcClient(Configuration conf) { - this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null); + this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null, null); } /** @@ -53,8 +54,8 @@ public class BlockingRpcClient extends AbstractRpcClient * @param metrics the connection metrics */ public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { - super(conf, clusterId, localAddr, metrics); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, localAddr, metrics, connectionAttributes); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index d63d14940e78..81ad4d2f056d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -219,7 +219,7 @@ public void cleanup(IOException e) { BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, - rpcClient.metrics); + rpcClient.metrics, rpcClient.connectionAttributes); this.rpcClient = rpcClient; this.connectionHeaderPreamble = getConnectionHeaderPreamble(); ConnectionHeader header = getConnectionHeader(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 3c0e24e57145..669fc73a3bfa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,6 +19,7 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; +import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -56,14 +57,15 @@ class Call { final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. final int priority; + final Map attributes; final MetricsConnection.CallStats callStats; private final RpcCallback callback; final Span span; Timeout timeoutTask; Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, - final Message responseDefaultType, int timeout, int priority, RpcCallback callback, - MetricsConnection.CallStats callStats) { + final Message responseDefaultType, int timeout, int priority, Map attributes, + RpcCallback callback, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; @@ -73,6 +75,7 @@ class Call { this.id = id; this.timeout = timeout; this.priority = priority; + this.attributes = attributes; this.callback = callback; this.span = Span.current(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index 9bee88d599f7..b185ff3b7495 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -112,6 +113,11 @@ public boolean hasCallTimeout() { return delegate.hasCallTimeout(); } + @Override + public Map getAttributes() { + return null; + } + @Override public void setFailed(IOException e) { delegate.setFailed(e); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index c60de7658f3d..2653387733fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -71,6 +72,8 @@ public interface HBaseRpcController extends RpcController, CellScannable { boolean hasCallTimeout(); + Map getAttributes(); + /** * Set failed with an exception to pass on. For use in async rpc clients * @param e exception to set with diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 99ed5c4d48b6..f3e710b171ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -166,6 +167,11 @@ public boolean hasCallTimeout() { return callTimeout != null; } + @Override + public Map getAttributes() { + return null; + } + @Override public synchronized String errorText() { if (!done || exception == null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index b509dcbd27b7..d6df6c974ccf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -25,6 +25,7 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; +import java.util.Map; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -44,10 +45,12 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -126,6 +129,14 @@ static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) if (call.priority != HConstants.PRIORITY_UNSET) { builder.setPriority(call.priority); } + if (call.attributes != null && !call.attributes.isEmpty()) { + HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); + for (Map.Entry attribute : call.attributes.entrySet()) { + attributeBuilder.setName(attribute.getKey()); + attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); + builder.addAttribute(attributeBuilder.build()); + } + } builder.setTimeout(call.timeout); return builder.build(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 231caa40a89e..08af86a3e0c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -55,7 +56,12 @@ public class NettyRpcClient extends AbstractRpcClient { public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { - super(configuration, clusterId, localAddress, metrics); + this(configuration, clusterId, localAddress, metrics, null); + } + + public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, + MetricsConnection metrics, Map connectionAttributes) { + super(configuration, clusterId, localAddress, metrics, connectionAttributes); Pair> groupAndChannelClass = NettyRpcClientConfigHelper.getEventLoopConfig(conf); if (groupAndChannelClass == null) { @@ -75,7 +81,7 @@ public NettyRpcClient(Configuration configuration, String clusterId, SocketAddre /** Used in test only. */ public NettyRpcClient(Configuration configuration) { - this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, null); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 48104038c217..3f9a58d51263 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -104,7 +104,7 @@ class NettyRpcConnection extends RpcConnection { NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, - rpcClient.metrics); + rpcClient.metrics, rpcClient.connectionAttributes); this.rpcClient = rpcClient; this.eventLoop = rpcClient.group.next(); byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java index 9b69b5234050..ca66cc86f33d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.net.SocketAddress; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -59,7 +60,7 @@ public static RpcClient createClient(Configuration conf, String clusterId) { */ public static RpcClient createClient(Configuration conf, String clusterId, MetricsConnection metrics) { - return createClient(conf, clusterId, null, metrics); + return createClient(conf, clusterId, null, metrics, null); } private static String getRpcClientClass(Configuration conf) { @@ -81,10 +82,11 @@ private static String getRpcClientClass(Configuration conf) { * @return newly created RpcClient */ public static RpcClient createClient(Configuration conf, String clusterId, - SocketAddress localAddr, MetricsConnection metrics) { + SocketAddress localAddr, MetricsConnection metrics, Map connectionAttributes) { String rpcClientClass = getRpcClientClass(conf); - return ReflectionUtils.instantiateWithCustomCtor(rpcClientClass, new Class[] { - Configuration.class, String.class, SocketAddress.class, MetricsConnection.class }, - new Object[] { conf, clusterId, localAddr, metrics }); + return ReflectionUtils.instantiateWithCustomCtor( + rpcClientClass, new Class[] { Configuration.class, String.class, SocketAddress.class, + MetricsConnection.class, Map.class }, + new Object[] { conf, clusterId, localAddr, metrics, connectionAttributes }); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 912fa4fb0654..31698a1a1e8e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -39,11 +40,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.TimerTask; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; @@ -70,6 +73,7 @@ abstract class RpcConnection { protected final CompressionCodec compressor; protected final MetricsConnection metrics; + private final Map connectionAttributes; protected final HashedWheelTimer timeoutTimer; @@ -86,12 +90,13 @@ abstract class RpcConnection { protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, - MetricsConnection metrics) throws IOException { + MetricsConnection metrics, Map connectionAttributes) throws IOException { this.timeoutTimer = timeoutTimer; this.codec = codec; this.compressor = compressor; this.conf = conf; this.metrics = metrics; + this.connectionAttributes = connectionAttributes; User ticket = remoteId.getTicket(); this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); this.useSasl = isSecurityEnabled; @@ -169,6 +174,14 @@ protected final ConnectionHeader getConnectionHeader() { if (this.compressor != null) { builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); } + if (connectionAttributes != null && !connectionAttributes.isEmpty()) { + HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); + for (Map.Entry attribute : connectionAttributes.entrySet()) { + attributeBuilder.setName(attribute.getKey()); + attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); + builder.addAttribute(attributeBuilder.build()); + } + } builder.setVersionInfo(ProtobufUtil.getVersionInfo()); boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); // if Crypto AES enable, setup Cipher transformation diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 51e9e1e7755f..fc7f66129d35 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -1667,9 +1667,10 @@ private static class ConfigurationCaptorConnection implements Connection { private final Connection delegate; - public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user) - throws IOException { - delegate = FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); + public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, + Map connectionAttributes) throws IOException { + delegate = + FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)).toConnection(); final String uuid = conf.get(UUID_KEY); if (uuid != null) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java index 7d099aa44e24..0c879bd5ace3 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java @@ -123,7 +123,8 @@ public static class MRSplitsConnection implements Connection { private final Configuration configuration; static final AtomicInteger creations = new AtomicInteger(0); - MRSplitsConnection(Configuration conf, ExecutorService pool, User user) throws IOException { + MRSplitsConnection(Configuration conf, ExecutorService pool, User user, + Map connectionAttributes) throws IOException { this.configuration = conf; creations.incrementAndGet(); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java index 13e3831f6df6..f41282b8f4f8 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java @@ -212,8 +212,8 @@ private static class ConnectionForMergeTesting implements Connection { SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L); } - ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user) - throws IOException { + ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user, + Map connectionAttributes) throws IOException { } @Override diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto index 6426f0cb06cb..e992e681fbff 100644 --- a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto +++ b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto @@ -92,6 +92,7 @@ message ConnectionHeader { optional VersionInfo version_info = 5; // the transformation for rpc AES encryption with Apache Commons Crypto optional string rpc_crypto_cipher_transformation = 6; + repeated NameBytesPair attribute = 7; } // This is sent by rpc server to negotiate the data if necessary @@ -148,6 +149,7 @@ message RequestHeader { // See HConstants. optional uint32 priority = 6; optional uint32 timeout = 7; + repeated NameBytesPair attribute = 8; } message ResponseHeader { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index 1dda6c32ca04..dc4ff954bd61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -59,7 +59,7 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) { - super(conf, registry, clusterId, localAddress, user); + super(conf, registry, clusterId, localAddress, user, null); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index 197ddb71d7e6..cc97a39c7ee4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -27,6 +27,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; /** @@ -82,6 +83,8 @@ public interface RpcCall extends RpcCallContext { /** Returns The request header of this call. */ RequestHeader getHeader(); + ConnectionHeader getConnectionHeader(); + /** Returns Port of remote address in this call */ int getRemotePort(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 2188795914db..9d8446dcfd49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.*; + import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; @@ -207,6 +209,11 @@ public RequestHeader getHeader() { return this.header; } + @Override + public ConnectionHeader getConnectionHeader() { + return this.connection.connectionHeader; + } + @Override public int getPriority() { return this.header.getPriority(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 65def75fff1b..756bf682ab6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -130,7 +130,7 @@ public void testAdminTimeout() throws Exception { public static class RandomTimeoutRpcClient extends BlockingRpcClient { public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, MetricsConnection metrics) { - super(conf, clusterId, localAddr, metrics); + super(conf, clusterId, localAddr, metrics, null); } // Return my own instance, one that does random timeouts diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java new file mode 100644 index 000000000000..198e4766a276 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestRequestAndConnectionAttributes { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRequestAndConnectionAttributes.class); + + private static final Map CONNECTION_ATTRIBUTES = new HashMap<>(); + static { + CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); + } + private static final Map REQUEST_ATTRIBUTES = new HashMap<>(); + + private static HBaseTestingUtil TEST_UTIL = null; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + TEST_UTIL.getConfiguration().set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + AttributesRpcControllerFactory.class.getName()); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testConnectionAttributes() throws IOException { + TableName tableName = TableName.valueOf("testConnectionAttributes"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { + Result result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); + for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { + byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey())); + assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); + } + } + } + + @Test + public void testRequestAttributes() throws IOException { + TableName tableName = TableName.valueOf("testRequestAttributes"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + + REQUEST_ATTRIBUTES.clear(); + REQUEST_ATTRIBUTES.put("test1", Bytes.toBytes("a")); + REQUEST_ATTRIBUTES.put("test2", Bytes.toBytes("b")); + + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { + + Result result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); + for (Map.Entry attr : REQUEST_ATTRIBUTES.entrySet()) { + byte[] val = result.getValue(Bytes.toBytes("r"), Bytes.toBytes(attr.getKey())); + assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); + } + + REQUEST_ATTRIBUTES.put("test3", Bytes.toBytes("c")); + result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); + for (Map.Entry attr : REQUEST_ATTRIBUTES.entrySet()) { + byte[] val = result.getValue(Bytes.toBytes("r"), Bytes.toBytes(attr.getKey())); + assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); + } + } + } + + @Test + public void testNoRequestAttributes() throws IOException { + TableName tableName = TableName.valueOf("testNoRequestAttributes"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + + REQUEST_ATTRIBUTES.clear(); + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { + TableBuilder tableBuilder = conn.getTableBuilder(tableName, null); + try (Table table = tableBuilder.build()) { + Result result = table.get(new Get(Bytes.toBytes(0))); + assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); + } + } + } + + public static class AttributesRpcControllerFactory extends RpcControllerFactory { + + public AttributesRpcControllerFactory(Configuration conf) { + super(conf); + } + + @Override + public HBaseRpcController newController() { + return new AttributesRpcController(super.newController()); + } + + @Override + public HBaseRpcController newController(CellScanner cellScanner) { + return new AttributesRpcController(super.newController(cellScanner)); + } + + @Override + public HBaseRpcController newController(RegionInfo regionInfo, CellScanner cellScanner) { + return new AttributesRpcController(super.newController(regionInfo, cellScanner)); + } + + @Override + public HBaseRpcController newController(List cellIterables) { + return new AttributesRpcController(super.newController(cellIterables)); + } + + @Override + public HBaseRpcController newController(RegionInfo regionInfo, + List cellIterables) { + return new AttributesRpcController(super.newController(regionInfo, cellIterables)); + } + } + + public static class AttributesRpcController extends DelegatingHBaseRpcController { + + public AttributesRpcController(HBaseRpcController delegate) { + super(delegate); + } + + @Override + public Map getAttributes() { + return REQUEST_ATTRIBUTES; + } + } + + public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + RpcCall rpcCall = RpcServer.getCurrentCall().get(); + for (HBaseProtos.NameBytesPair attr : rpcCall.getHeader().getAttributeList()) { + result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) + .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName())) + .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + } + for (HBaseProtos.NameBytesPair attr : rpcCall.getConnectionHeader().getAttributeList()) { + result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) + .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getName())) + .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + } + result.sort(CellComparator.getInstance()); + c.bypass(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java index f36fef186f08..b6d8d160791b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java @@ -72,7 +72,7 @@ public MyRpcClientImpl(Configuration conf) { public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address, MetricsConnection metrics) { - super(conf, clusterId, address, metrics); + super(conf, clusterId, address, metrics, null); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java index e14b710647d1..80b3845d6688 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java @@ -124,7 +124,7 @@ public void test() throws IOException, InterruptedException { int callId = 10; Call call = new Call(callId, TestProtobufRpcProto.getDescriptor().findMethodByName("ping"), EmptyRequestProto.getDefaultInstance(), null, EmptyResponseProto.getDefaultInstance(), 1000, - HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats()); + HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats()); RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null); dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param)); requestHeader.writeDelimitedTo(dos); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 909e7fdb7f3d..7a82d08f4adb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -694,6 +694,11 @@ public RPCProtos.RequestHeader getHeader() { return null; } + @Override + public RPCProtos.ConnectionHeader getConnectionHeader() { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index d26870b77dfd..9bbbdbeebad0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -220,6 +220,11 @@ public RPCProtos.RequestHeader getHeader() { return null; } + @Override + public RPCProtos.ConnectionHeader getConnectionHeader() { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java index 250b8a74f030..bdb54c2bddd5 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java @@ -89,8 +89,8 @@ public class ThriftConnection implements Connection { private int operationTimeout; private int connectTimeout; - public ThriftConnection(Configuration conf, ExecutorService pool, final User user) - throws IOException { + public ThriftConnection(Configuration conf, ExecutorService pool, final User user, + Map connectionAttributes) throws IOException { this.conf = conf; this.user = user; this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME); From 441ae90a80fecb5a8193f4ec123c5b8522e0200e Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Mon, 26 Jun 2023 18:26:59 -0400 Subject: [PATCH 02/12] PR feedback: prefer emptyMap, no wildcard imports --- .../org/apache/hadoop/hbase/client/AsyncConnectionImpl.java | 3 ++- .../org/apache/hadoop/hbase/client/ConnectionFactory.java | 3 ++- .../java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java | 3 ++- .../hadoop/hbase/ipc/DelegatingHBaseRpcController.java | 3 ++- .../org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java | 3 ++- .../java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java | 5 +++-- .../java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java | 3 ++- .../hadoop/hbase/client/AsyncClusterConnectionImpl.java | 3 ++- .../main/java/org/apache/hadoop/hbase/ipc/ServerCall.java | 5 ++--- 9 files changed, 19 insertions(+), 12 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 8c7223d7be20..4900581c69ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -31,6 +31,7 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -128,7 +129,7 @@ public class AsyncConnectionImpl implements AsyncConnection { public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) { - this(conf, registry, clusterId, localAddress, user, null); + this(conf, registry, clusterId, localAddress, user, Collections.emptyMap()); } public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 531b7911f0dd..ac70091dcf65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.security.PrivilegedExceptionAction; +import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -217,7 +218,7 @@ public static Connection createConnection(Configuration conf, User user) throws */ public static Connection createConnection(Configuration conf, ExecutorService pool, final User user) throws IOException { - return createConnection(conf, pool, user, null); + return createConnection(conf, pool, user, Collections.emptyMap()); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java index 0b48089658c2..3da00c5395d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; import java.util.Map; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; @@ -42,7 +43,7 @@ public class BlockingRpcClient extends AbstractRpcClient * SocketFactory */ BlockingRpcClient(Configuration conf) { - this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null, null); + this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap()); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index b185ff3b7495..caf0dae03ce5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Collections; import java.util.Map; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.TableName; @@ -115,7 +116,7 @@ public boolean hasCallTimeout() { @Override public Map getAttributes() { - return null; + return Collections.emptyMap(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index f3e710b171ef..6b862acac9a0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.CellScannable; @@ -169,7 +170,7 @@ public boolean hasCallTimeout() { @Override public Map getAttributes() { - return null; + return Collections.emptyMap(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 08af86a3e0c5..ed0c4fffc724 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; @@ -56,7 +57,7 @@ public class NettyRpcClient extends AbstractRpcClient { public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { - this(configuration, clusterId, localAddress, metrics, null); + this(configuration, clusterId, localAddress, metrics, Collections.emptyMap()); } public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, @@ -81,7 +82,7 @@ public NettyRpcClient(Configuration configuration, String clusterId, SocketAddre /** Used in test only. */ public NettyRpcClient(Configuration configuration) { - this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, null); + this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap()); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java index ca66cc86f33d..f1df572675c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.net.SocketAddress; +import java.util.Collections; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.MetricsConnection; @@ -60,7 +61,7 @@ public static RpcClient createClient(Configuration conf, String clusterId) { */ public static RpcClient createClient(Configuration conf, String clusterId, MetricsConnection metrics) { - return createClient(conf, clusterId, null, metrics, null); + return createClient(conf, clusterId, null, metrics, Collections.emptyMap()); } private static String getRpcClientClass(Configuration conf) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index dc4ff954bd61..e2c11ab1d5e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.net.SocketAddress; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -59,7 +60,7 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) { - super(conf, registry, clusterId, localAddress, user, null); + super(conf, registry, clusterId, localAddress, user, Collections.emptyMap()); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 9d8446dcfd49..f3568a36f144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.*; - import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; @@ -51,6 +49,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -210,7 +209,7 @@ public RequestHeader getHeader() { } @Override - public ConnectionHeader getConnectionHeader() { + public RPCProtos.ConnectionHeader getConnectionHeader() { return this.connection.connectionHeader; } From 33481e19f2f6cf10fd72bff5477e148acaef00fa Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Mon, 26 Jun 2023 21:33:30 -0400 Subject: [PATCH 03/12] default attributes in Call constructor --- .../src/main/java/org/apache/hadoop/hbase/ipc/Call.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 669fc73a3bfa..0298942e29fa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,6 +19,7 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -63,6 +64,13 @@ class Call { final Span span; Timeout timeoutTask; + Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, + final Message responseDefaultType, int timeout, int priority, RpcCallback callback, + MetricsConnection.CallStats callStats) { + this(id, md, param, cells, responseDefaultType, timeout, priority, Collections.emptyMap(), + callback, callStats); + } + Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority, Map attributes, RpcCallback callback, MetricsConnection.CallStats callStats) { From beef1fbed8caab3d8fe91dfc35c223900c8d4a88 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 27 Jun 2023 09:27:43 -0400 Subject: [PATCH 04/12] fix RpcClient in TestRpcBasedRegistryHedgedReads --- .../hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java index 6c97c19f96cc..54b351f00a3b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -95,7 +96,7 @@ public class TestRpcBasedRegistryHedgedReads { public static final class RpcClientImpl implements RpcClient { public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress, - MetricsConnection metrics) { + MetricsConnection metrics, Map attributes) { } @Override From c3dde33ef7aab2c1bdbce92ed7dd939f29eefe32 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 27 Jun 2023 16:17:02 -0400 Subject: [PATCH 05/12] more test fixes --- .../org/apache/hadoop/hbase/client/TestClientTimeouts.java | 5 +++-- .../java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index 756bf682ab6a..d358695c5f9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -22,6 +22,7 @@ import java.net.SocketAddress; import java.net.SocketTimeoutException; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -129,8 +130,8 @@ public void testAdminTimeout() throws Exception { */ public static class RandomTimeoutRpcClient extends BlockingRpcClient { public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, - MetricsConnection metrics) { - super(conf, clusterId, localAddr, metrics, null); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, localAddr, metrics, connectionAttributes); } // Return my own instance, one that does random timeouts diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java index b6d8d160791b..feaf44e0b84e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.Socket; import java.net.SocketAddress; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; @@ -71,8 +72,8 @@ public MyRpcClientImpl(Configuration conf) { } public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address, - MetricsConnection metrics) { - super(conf, clusterId, address, metrics, null); + MetricsConnection metrics, Map connectionAttributes) { + super(conf, clusterId, address, metrics, connectionAttributes); } @Override From d5e092db37b28d9d22f97084570aeb5c08fd0852 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Fri, 14 Jul 2023 11:06:32 -0400 Subject: [PATCH 06/12] request attributes support in tablebuilder --- .../AsyncAdminRequestRetryingCaller.java | 3 +- .../client/AsyncBatchRpcRetryingCaller.java | 7 +- .../hbase/client/AsyncClientScanner.java | 31 ++- .../AsyncMasterRequestRpcRetryingCaller.java | 3 +- .../hbase/client/AsyncRpcRetryingCaller.java | 4 +- .../client/AsyncRpcRetryingCallerFactory.java | 31 ++- ...syncScanSingleRegionRpcRetryingCaller.java | 4 +- .../AsyncServerRequestRpcRetryingCaller.java | 3 +- .../AsyncSingleRequestRpcRetryingCaller.java | 6 +- .../hadoop/hbase/client/AsyncTable.java | 3 + .../hbase/client/AsyncTableBuilder.java | 6 + .../hbase/client/AsyncTableBuilderBase.java | 10 + .../hadoop/hbase/client/AsyncTableImpl.java | 6 + .../client/ConnectionOverAsyncConnection.java | 3 +- .../hbase/client/RawAsyncTableImpl.java | 17 +- .../org/apache/hadoop/hbase/client/Table.java | 8 + .../hadoop/hbase/client/TableBuilder.java | 5 + .../hadoop/hbase/client/TableBuilderBase.java | 15 ++ .../hbase/client/TableOverAsyncTable.java | 5 + .../hadoop/hbase/ipc/AbstractRpcClient.java | 31 +-- .../ipc/DelegatingHBaseRpcController.java | 10 +- .../hadoop/hbase/ipc/HBaseRpcController.java | 4 +- .../hbase/ipc/HBaseRpcControllerImpl.java | 11 +- .../AsyncRegionReplicationRetryingCaller.java | 4 +- .../hadoop/hbase/client/DummyAsyncTable.java | 6 + .../TestRequestAndConnectionAttributes.java | 231 ++++++++++++------ 26 files changed, 347 insertions(+), 120 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index d3bec8b3cfbf..f7fa7e9f03fa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -44,7 +45,7 @@ public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl con long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap()); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7a8bbeb9420b..c485a0a2c05c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -114,6 +114,8 @@ class AsyncBatchRpcRetryingCaller { private final HBaseServerExceptionPauseManager pauseManager; + private final Map requestAttributes; + // we can not use HRegionLocation as the map key because the hashCode and equals method of // HRegionLocation only consider serverName. private static final class RegionRequest { @@ -149,7 +151,8 @@ public int getPriority() { public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, List actions, long pauseNs, long pauseNsForServerOverloaded, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, + Map requestAttributes) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; @@ -180,6 +183,7 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, this.startNs = System.nanoTime(); this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs); + this.requestAttributes = requestAttributes; } private static boolean hasIncrementOrAppend(Row action) { @@ -392,6 +396,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr HBaseRpcController controller = conn.rpcControllerFactory.newController(); resetController(controller, Math.min(rpcTimeoutNs, remainingNs), calcPriority(serverReq.getPriority(), tableName)); + controller.setRequestAttributes(requestAttributes); if (!cells.isEmpty()) { controller.setCellScanner(createCellScanner(cells)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index ed381df7e0da..b61f5b80c9e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -32,6 +32,7 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; import java.io.IOException; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -92,9 +93,12 @@ class AsyncClientScanner { private final Span span; + private final Map requestAttributes; + public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded, - int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, + Map requestAttributes) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); } @@ -113,6 +117,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; this.resultCache = createScanResultCache(scan); + this.requestAttributes = requestAttributes; if (scan.isScanMetricsEnabled()) { this.scanMetrics = new ScanMetrics(); consumer.onScanMetricsCreated(scanMetrics); @@ -191,15 +196,17 @@ private CompletableFuture callOpenScanner(HBaseRpcControlle } private void startScan(OpenScannerResponse resp) { - addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()) - .location(resp.loc).remote(resp.isRegionServerRemote) - .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) - .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .start(resp.controller, resp.resp), (hasMore, error) -> { + addListener( + conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) + .remote(resp.isRegionServerRemote) + .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) + .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .setRequestAttributes(requestAttributes).start(resp.controller, resp.resp), + (hasMore, error) -> { try (Scope ignored = span.makeCurrent()) { if (error != null) { try { @@ -231,8 +238,8 @@ private CompletableFuture openScanner(int replicaId) { .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) - .call(); + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .setRequestAttributes(requestAttributes).action(this::callOpenScanner).call(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index c02b80c666ae..42585ea1c919 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -47,7 +48,7 @@ public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap()); this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 8b317bfec2c2..c3dd8740854e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; @@ -78,7 +79,7 @@ public abstract class AsyncRpcRetryingCaller { public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, - long rpcTimeoutNs, int startLogErrorsCnt) { + long rpcTimeoutNs, int startLogErrorsCnt, Map requestAttributes) { this.retryTimer = retryTimer; this.conn = conn; this.priority = priority; @@ -89,6 +90,7 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr this.future = new CompletableFuture<>(); this.controller = conn.rpcControllerFactory.newController(); this.controller.setPriority(priority); + this.controller.setRequestAttributes(requestAttributes); this.exceptions = new ArrayList<>(); this.startNs = System.nanoTime(); this.pauseManager = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 2d8e7b7aabe9..1ea2a1ad7dd4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -23,7 +23,9 @@ import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HRegionLocation; @@ -83,6 +85,8 @@ public class SingleRequestCallerBuilder extends BuilderBase { private int priority = PRIORITY_UNSET; + private Map requestAttributes = Collections.emptyMap(); + public SingleRequestCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -144,6 +148,12 @@ public SingleRequestCallerBuilder priority(int priority) { return this; } + public SingleRequestCallerBuilder + setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + private void preCheck() { checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); checkNotNull(tableName, "tableName is null"); @@ -157,7 +167,7 @@ public AsyncSingleRequestRpcRetryingCaller build() { preCheck(); return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); } /** @@ -201,6 +211,8 @@ public class ScanSingleRegionCallerBuilder extends BuilderBase { private int priority = PRIORITY_UNSET; + private Map requestAttributes = Collections.emptyMap(); + public ScanSingleRegionCallerBuilder id(long scannerId) { this.scannerId = scannerId; return this; @@ -278,6 +290,12 @@ public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { return this; } + public ScanSingleRegionCallerBuilder + setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + private void preCheck() { checkArgument(scannerId != null, "invalid scannerId %d", scannerId); checkNotNull(scan, "scan is null"); @@ -293,7 +311,7 @@ public AsyncScanSingleRegionRpcRetryingCaller build() { return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, - scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); } /** @@ -322,6 +340,8 @@ public class BatchCallerBuilder extends BuilderBase { private long rpcTimeoutNs = -1L; + private Map requestAttributes = Collections.emptyMap(); + public BatchCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -362,10 +382,15 @@ public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { return this; } + public BatchCallerBuilder setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + public AsyncBatchRpcRetryingCaller build() { return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + startLogErrorsCnt, requestAttributes); } public List> call() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index ca39051de84d..a5d4ef6407e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; @@ -316,7 +317,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, - int startLogErrorsCnt) { + int startLogErrorsCnt, Map requestAttributes) { this.retryTimer = retryTimer; this.conn = conn; this.scan = scan; @@ -341,6 +342,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.priority = priority; this.controller = conn.rpcControllerFactory.newController(); this.controller.setPriority(priority); + this.controller.setRequestAttributes(requestAttributes); this.exceptions = new ArrayList<>(); this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index 40cd3b87e928..d4484ba87bf1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; @@ -49,7 +50,7 @@ public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseNsForServerOverloaded, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap()); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 9c115af97b5b..a0d536aef5f7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.HRegionLocation; @@ -57,9 +58,10 @@ CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, + Map requestAttributes) { super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); this.tableName = tableName; this.row = row; this.replicaId = replicaId; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 3c03444cfbbc..1a42d4558aba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.allOf; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -110,6 +111,8 @@ public interface AsyncTable { */ long getScanTimeout(TimeUnit unit); + Map getRequestAttributes(); + /** * Test for the existence of columns in the table, as specified by the Get. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index f6db89f82bf5..9cb6b8fddfe6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseServerException; import org.apache.yetus.audience.InterfaceAudience; @@ -137,6 +138,11 @@ default AsyncTableBuilder setMaxRetries(int maxRetries) { */ AsyncTableBuilder setStartLogErrorsCnt(int startLogErrorsCnt); + /** + * Sets a request attribute + */ + AsyncTableBuilder setRequestAttributes(Map requestAttributes); + /** * Create the {@link AsyncTable} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 624d6e1dbb0a..f4cacce900db 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -19,6 +19,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -50,6 +52,8 @@ abstract class AsyncTableBuilderBase protected int startLogErrorsCnt; + protected Map requestAttributes = new HashMap<>(); + AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) { this.tableName = tableName; this.operationTimeoutNs = tableName.isSystemTable() @@ -121,4 +125,10 @@ public AsyncTableBuilderBase setStartLogErrorsCnt(int startLogErrorsCnt) { this.startLogErrorsCnt = startLogErrorsCnt; return this; } + + @Override + public AsyncTableBuilder setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index e785e587ab36..590ee9bc47a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -24,6 +24,7 @@ import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -101,6 +102,11 @@ public long getScanTimeout(TimeUnit unit) { return rawTable.getScanTimeout(unit); } + @Override + public Map getRequestAttributes() { + return rawTable.getRequestAttributes(); + } + private CompletableFuture wrap(CompletableFuture future) { return FutureUtils.wrapFuture(future, pool); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index 7a7b38a4df6a..55d255b88f54 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -193,7 +193,8 @@ public Table build() { conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) - .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(), + .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS) + .setRequestAttributes(requestAttributes).build(), poolSupplier); } }; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index ff75c0725ce5..342cf89acf1a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -119,6 +120,8 @@ class RawAsyncTableImpl implements AsyncTable { private final int startLogErrorsCnt; + private final Map requestAttributes; + RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase builder) { this.conn = conn; this.retryTimer = retryTimer; @@ -145,6 +148,7 @@ class RawAsyncTableImpl implements AsyncTable { ? conn.connConf.getMetaScannerCaching() : conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); + this.requestAttributes = builder.requestAttributes; } @Override @@ -210,7 +214,8 @@ private SingleRequestCallerBuilder newCaller(byte[] row, int priority, lo .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); + .maxAttempts(maxAttempts).setRequestAttributes(requestAttributes) + .startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes); } private SingleRequestCallerBuilder @@ -608,7 +613,7 @@ private Scan setDefaultScanConfig(Scan scan) { public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, - startLogErrorsCnt).start(); + startLogErrorsCnt, requestAttributes).start(); } private long resultSize2CacheSize(long maxResultSize) { @@ -704,7 +709,8 @@ private List> batch(List actions, long r .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call(); + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .setRequestAttributes(requestAttributes).call(); } @Override @@ -732,6 +738,11 @@ public long getScanTimeout(TimeUnit unit) { return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); } + @Override + public Map getRequestAttributes() { + return requestAttributes; + } + private CompletableFuture coprocessorService(Function stubMaker, ServiceCaller callable, RegionInfo region, byte[] row) { RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 7feefc831ca0..907e3d1a7040 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -751,4 +751,12 @@ default long getWriteRpcTimeout(TimeUnit unit) { default long getOperationTimeout(TimeUnit unit) { throw new NotImplementedException("Add an implementation!"); } + + /** + * Get the attributes to be submitted with requests + * @return map of request attributes + */ + default Map getRequestAttributes() { + return Collections.emptyMap(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java index 75e16e89a5de..bda55280ee83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Map; import org.apache.yetus.audience.InterfaceAudience; /** @@ -55,6 +56,10 @@ public interface TableBuilder { */ TableBuilder setWriteRpcTimeout(int timeout); + TableBuilder setRequestAttributes(Map requestAttributes); + + Map getRequestAttributes(); + /** * Create the {@link Table} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java index c74340259f3f..f43f28fc9f86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -36,6 +38,8 @@ abstract class TableBuilderBase implements TableBuilder { protected int writeRpcTimeout; + protected Map requestAttributes = new HashMap<>(); + TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) { if (tableName == null) { throw new IllegalArgumentException("Given table name is null"); @@ -73,4 +77,15 @@ public TableBuilderBase setWriteRpcTimeout(int timeout) { this.writeRpcTimeout = timeout; return this; } + + @Override + public TableBuilderBase setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + + @Override + public Map getRequestAttributes() { + return requestAttributes; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index e1565f18159a..0a7dabd476ce 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -560,6 +560,11 @@ public long getOperationTimeout(TimeUnit unit) { return table.getOperationTimeout(unit); } + @Override + public Map getRequestAttributes() { + return table.getRequestAttributes(); + } + @Override public RegionLocator getRegionLocator() throws IOException { return conn.toConnection().getRegionLocator(getName()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 9e7f1a5b502e..5e42558671b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -419,23 +419,24 @@ private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcCon } final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); - Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, - hrc.getCallTimeout(), hrc.getPriority(), hrc.getAttributes(), new RpcCallback() { - @Override - public void run(Call call) { - try (Scope scope = call.span.makeCurrent()) { - counter.decrementAndGet(); - onCallFinished(call, hrc, addr, callback); - } finally { - if (hrc.failed()) { - TraceUtil.setError(span, hrc.getFailed()); - } else { - span.setStatus(StatusCode.OK); + Call call = + new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), + hrc.getPriority(), hrc.getRequestAttributes(), new RpcCallback() { + @Override + public void run(Call call) { + try (Scope scope = call.span.makeCurrent()) { + counter.decrementAndGet(); + onCallFinished(call, hrc, addr, callback); + } finally { + if (hrc.failed()) { + TraceUtil.setError(span, hrc.getFailed()); + } else { + span.setStatus(StatusCode.OK); + } + span.end(); } - span.end(); } - } - }, cs); + }, cs); ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr); int count = counter.incrementAndGet(); try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index caf0dae03ce5..c752f4c18355 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.util.Collections; import java.util.Map; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.TableName; @@ -115,8 +114,13 @@ public boolean hasCallTimeout() { } @Override - public Map getAttributes() { - return Collections.emptyMap(); + public Map getRequestAttributes() { + return delegate.getRequestAttributes(); + } + + @Override + public void setRequestAttributes(Map requestAttributes) { + delegate.setRequestAttributes(requestAttributes); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 2653387733fd..e630c23762f5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -72,7 +72,9 @@ public interface HBaseRpcController extends RpcController, CellScannable { boolean hasCallTimeout(); - Map getAttributes(); + Map getRequestAttributes(); + + void setRequestAttributes(Map requestAttributes); /** * Set failed with an exception to pass on. For use in async rpc clients diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 6b862acac9a0..425c5e77afcd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -72,6 +72,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { */ private CellScanner cellScanner; + private Map requestAttributes = Collections.emptyMap(); + public HBaseRpcControllerImpl() { this(null, (CellScanner) null); } @@ -169,8 +171,13 @@ public boolean hasCallTimeout() { } @Override - public Map getAttributes() { - return Collections.emptyMap(); + public Map getRequestAttributes() { + return requestAttributes; + } + + @Override + public void setRequestAttributes(Map requestAttributes) { + this.requestAttributes = requestAttributes; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java index 02718145c9b7..e2b45fe30c3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -54,7 +55,8 @@ public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, RegionInfo replica, List entries) { super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()), conn.connConf.getPauseNs(), conn.connConf.getPauseNsForServerOverloaded(), maxAttempts, - operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); + operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt(), + Collections.emptyMap()); this.replica = replica; this.entries = entries.toArray(new Entry[0]); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java index a87babad0d27..45e59def7216 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -77,6 +78,11 @@ public long getScanTimeout(TimeUnit unit) { return 0; } + @Override + public Map getRequestAttributes() { + return null; + } + @Override public CompletableFuture get(Get get) { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java index 198e4766a276..23838b19aeb4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -18,18 +18,24 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import com.google.common.collect.ImmutableList; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; @@ -38,15 +44,15 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcCall; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -66,15 +72,21 @@ public class TestRequestAndConnectionAttributes { CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo")); } private static final Map REQUEST_ATTRIBUTES = new HashMap<>(); + private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); + private static final AtomicBoolean REQUEST_ATTRIBUTES_VALIDATED = new AtomicBoolean(false); + private static final byte[] REQUEST_ATTRIBUTES_TEST_TABLE_CF = Bytes.toBytes("0"); + private static final TableName REQUEST_ATTRIBUTES_TEST_TABLE = + TableName.valueOf("testRequestAttributes"); private static HBaseTestingUtil TEST_UTIL = null; @BeforeClass public static void setUp() throws Exception { TEST_UTIL = new HBaseTestingUtil(); - TEST_UTIL.getConfiguration().set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, - AttributesRpcControllerFactory.class.getName()); TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE, + new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE, + AttributesCoprocessor.class.getName()); } @AfterClass @@ -82,6 +94,11 @@ public static void afterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); } + @Before + public void setup() { + REQUEST_ATTRIBUTES_VALIDATED.getAndSet(false); + } + @Test public void testConnectionAttributes() throws IOException { TableName tableName = TableName.valueOf("testConnectionAttributes"); @@ -92,7 +109,7 @@ public void testConnectionAttributes() throws IOException { try (Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { Result result = table.get(new Get(Bytes.toBytes(0))); - assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); + assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey())); assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); @@ -101,96 +118,135 @@ public void testConnectionAttributes() throws IOException { } @Test - public void testRequestAttributes() throws IOException { - TableName tableName = TableName.valueOf("testRequestAttributes"); - TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, - HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); - - REQUEST_ATTRIBUTES.clear(); - REQUEST_ATTRIBUTES.put("test1", Bytes.toBytes("a")); - REQUEST_ATTRIBUTES.put("test2", Bytes.toBytes("b")); + public void testRequestAttributesGet() throws IOException { + addRandomRequestAttributes(); Configuration conf = TEST_UTIL.getConfiguration(); - try (Connection conn = ConnectionFactory.createConnection(conf, null, - AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { - - Result result = table.get(new Get(Bytes.toBytes(0))); - assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); - for (Map.Entry attr : REQUEST_ATTRIBUTES.entrySet()) { - byte[] val = result.getValue(Bytes.toBytes("r"), Bytes.toBytes(attr.getKey())); - assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); - } + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) + .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { - REQUEST_ATTRIBUTES.put("test3", Bytes.toBytes("c")); - result = table.get(new Get(Bytes.toBytes(0))); - assertEquals(REQUEST_ATTRIBUTES.size() + CONNECTION_ATTRIBUTES.size(), result.size()); - for (Map.Entry attr : REQUEST_ATTRIBUTES.entrySet()) { - byte[] val = result.getValue(Bytes.toBytes("r"), Bytes.toBytes(attr.getKey())); - assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val)); - } + table.get(new Get(Bytes.toBytes(0))); } + + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); } @Test - public void testNoRequestAttributes() throws IOException { - TableName tableName = TableName.valueOf("testNoRequestAttributes"); - TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, - HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + public void testRequestAttributesMultiGet() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); - REQUEST_ATTRIBUTES.clear(); Configuration conf = TEST_UTIL.getConfiguration(); - try (Connection conn = ConnectionFactory.createConnection(conf, null, - AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { - TableBuilder tableBuilder = conn.getTableBuilder(tableName, null); - try (Table table = tableBuilder.build()) { - Result result = table.get(new Get(Bytes.toBytes(0))); - assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); - } + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) + .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + List gets = ImmutableList.of(new Get(Bytes.toBytes(0)), new Get(Bytes.toBytes(1))); + table.get(gets); } + + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); } - public static class AttributesRpcControllerFactory extends RpcControllerFactory { + @Test + public void testRequestAttributesExists() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); - public AttributesRpcControllerFactory(Configuration conf) { - super(conf); - } + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) + .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { - @Override - public HBaseRpcController newController() { - return new AttributesRpcController(super.newController()); + table.exists(new Get(Bytes.toBytes(0))); } - @Override - public HBaseRpcController newController(CellScanner cellScanner) { - return new AttributesRpcController(super.newController(cellScanner)); - } + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } - @Override - public HBaseRpcController newController(RegionInfo regionInfo, CellScanner cellScanner) { - return new AttributesRpcController(super.newController(regionInfo, cellScanner)); + @Test + public void testRequestAttributesScan() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) + .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + ResultScanner scanner = table.getScanner(new Scan()); + scanner.next(); } + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } - @Override - public HBaseRpcController newController(List cellIterables) { - return new AttributesRpcController(super.newController(cellIterables)); + @Test + public void testRequestAttributesPut() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) + .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + Put put = new Put(Bytes.toBytes("a")); + put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); + table.put(put); } + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } - @Override - public HBaseRpcController newController(RegionInfo regionInfo, - List cellIterables) { - return new AttributesRpcController(super.newController(regionInfo, cellIterables)); + @Test + public void testRequestAttributesMultiPut() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + addRandomRequestAttributes(); + + Configuration conf = TEST_UTIL.getConfiguration(); + try ( + Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), + CONNECTION_ATTRIBUTES); + Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) + .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + Put put = new Put(Bytes.toBytes("a")); + put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); + table.put(put); } + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); } - public static class AttributesRpcController extends DelegatingHBaseRpcController { + @Test + public void testNoRequestAttributes() throws IOException { + assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get()); + TableName tableName = TableName.valueOf("testNoRequestAttributesScan"); + TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, + HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); - public AttributesRpcController(HBaseRpcController delegate) { - super(delegate); + REQUEST_ATTRIBUTES.clear(); + Configuration conf = TEST_UTIL.getConfiguration(); + try (Connection conn = ConnectionFactory.createConnection(conf, null, + AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) { + TableBuilder tableBuilder = conn.getTableBuilder(tableName, null); + try (Table table = tableBuilder.build()) { + table.get(new Get(Bytes.toBytes(0))); + assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get()); + } } + } - @Override - public Map getAttributes() { - return REQUEST_ATTRIBUTES; + private void addRandomRequestAttributes() { + REQUEST_ATTRIBUTES.clear(); + int j = Math.max(2, (int) (10 * Math.random())); + for (int i = 0; i < j; i++) { + REQUEST_ATTRIBUTES.put(String.valueOf(i), Bytes.toBytes(UUID.randomUUID().toString())); } } @@ -204,6 +260,9 @@ public Optional getRegionObserver() { @Override public void preGetOp(ObserverContext c, Get get, List result) throws IOException { + validateRequestAttributes(); + + // for connection attrs test RpcCall rpcCall = RpcServer.getCurrentCall().get(); for (HBaseProtos.NameBytesPair attr : rpcCall.getHeader().getAttributeList()) { result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) @@ -218,5 +277,35 @@ public void preGetOp(ObserverContext c, Get get, result.sort(CellComparator.getInstance()); c.bypass(); } + + @Override + public boolean preScannerNext(ObserverContext c, + InternalScanner s, List result, int limit, boolean hasNext) throws IOException { + validateRequestAttributes(); + return hasNext; + } + + @Override + public void prePut(ObserverContext c, Put put, WALEdit edit) + throws IOException { + validateRequestAttributes(); + } + + private void validateRequestAttributes() { + RpcCall rpcCall = RpcServer.getCurrentCall().get(); + List attrs = rpcCall.getHeader().getAttributeList(); + if (attrs.size() != REQUEST_ATTRIBUTES.size()) { + return; + } + for (HBaseProtos.NameBytesPair attr : attrs) { + if (!REQUEST_ATTRIBUTES.containsKey(attr.getName())) { + return; + } + if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getName()), attr.getValue().toByteArray())) { + return; + } + } + REQUEST_ATTRIBUTES_VALIDATED.getAndSet(true); + } } } From ee3eef89f262d1d362a770b1f6f496dab49e05c2 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Mon, 17 Jul 2023 09:36:55 -0400 Subject: [PATCH 07/12] cleanup --- .../java/org/apache/hadoop/hbase/client/AsyncTable.java | 4 ++++ .../org/apache/hadoop/hbase/client/AsyncTableBuilder.java | 2 +- .../java/org/apache/hadoop/hbase/client/TableBuilder.java | 5 +++-- .../org/apache/hadoop/hbase/client/TableBuilderBase.java | 5 ----- .../org/apache/hadoop/hbase/ipc/HBaseRpcController.java | 6 ++++++ 5 files changed, 14 insertions(+), 8 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 1a42d4558aba..d648dfb30e0e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -111,6 +111,10 @@ public interface AsyncTable { */ long getScanTimeout(TimeUnit unit); + /** + * Get the map of request attributes + * @return a map of request attributes supplied by the client + */ Map getRequestAttributes(); /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 9cb6b8fddfe6..4bbb5fb8d897 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -139,7 +139,7 @@ default AsyncTableBuilder setMaxRetries(int maxRetries) { AsyncTableBuilder setStartLogErrorsCnt(int startLogErrorsCnt); /** - * Sets a request attribute + * Sets the map of request attributes */ AsyncTableBuilder setRequestAttributes(Map requestAttributes); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java index bda55280ee83..2f7c78b83f35 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java @@ -56,10 +56,11 @@ public interface TableBuilder { */ TableBuilder setWriteRpcTimeout(int timeout); + /** + * Sets the map of request attributes + */ TableBuilder setRequestAttributes(Map requestAttributes); - Map getRequestAttributes(); - /** * Create the {@link Table} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java index f43f28fc9f86..69186aabc423 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -83,9 +83,4 @@ public TableBuilderBase setRequestAttributes(Map requestAttribut this.requestAttributes = requestAttributes; return this; } - - @Override - public Map getRequestAttributes() { - return requestAttributes; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index e630c23762f5..cd303a5eda77 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -72,8 +72,14 @@ public interface HBaseRpcController extends RpcController, CellScannable { boolean hasCallTimeout(); + /** + * Get the map of request attributes + */ Map getRequestAttributes(); + /** + * Set the map of request attributes + */ void setRequestAttributes(Map requestAttributes); /** From 0b23de46f7ce866aa8bc746433b7b5af73c1681c Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 18 Jul 2023 11:51:41 -0400 Subject: [PATCH 08/12] checkstyle, banned imports --- .../hbase/client/TestRequestAndConnectionAttributes.java | 3 ++- .../hadoop/hbase/namequeues/TestNamedQueueRecorder.java | 2 +- .../procedure2/store/region/TestRegionProcedureStore.java | 1 + .../hadoop/hbase/thrift2/client/ThriftConnection.java | 6 ++++++ 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java index 23838b19aeb4..b0876501974b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import com.google.common.collect.ImmutableList; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; @@ -58,6 +57,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @Category({ ClientTests.class, MediumTests.class }) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 7a82d08f4adb..7a3ca0b7cf9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -632,6 +632,7 @@ private static RpcCall getRpcCall(String userName, int forcedParamIndex) { return getRpcCall(userName, Optional.of(forcedParamIndex)); } + @SuppressWarnings("checkstyle:methodlength") private static RpcCall getRpcCall(String userName, Optional forcedParamIndex) { RpcCall rpcCall = new RpcCall() { @Override @@ -666,7 +667,6 @@ public long getStartTime() { @Override public void setStartTime(long startTime) { - } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index 9bbbdbeebad0..dd49d00ac3a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -158,6 +158,7 @@ public void testInsertWithRpcCall() throws Exception { RpcServer.setCurrentCall(null); } + @SuppressWarnings("checkstyle:methodlength") private RpcCall newRpcCallWithDeadline() { return new RpcCall() { @Override diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java index bdb54c2bddd5..b14cd5f2c062 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java @@ -322,6 +322,12 @@ public TableBuilder setWriteRpcTimeout(int timeout) { return this; } + @Override + public TableBuilder setRequestAttributes(Map requestAttributes) { + this.setRequestAttributes(requestAttributes); + return this; + } + @Override public Table build() { try { From 99169a22094b24898f6c20fbcec697382c80adf0 Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Tue, 18 Jul 2023 16:06:42 -0400 Subject: [PATCH 09/12] PR feedback --- .../java/org/apache/hadoop/hbase/client/AsyncTable.java | 5 ++++- .../main/java/org/apache/hadoop/hbase/client/Table.java | 2 +- .../src/main/java/org/apache/hadoop/hbase/ipc/Call.java | 8 -------- .../apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java | 4 +++- .../hadoop/hbase/thrift2/client/ThriftConnection.java | 1 - 5 files changed, 8 insertions(+), 12 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index d648dfb30e0e..2979c6689884 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; @@ -115,7 +116,9 @@ public interface AsyncTable { * Get the map of request attributes * @return a map of request attributes supplied by the client */ - Map getRequestAttributes(); + default Map getRequestAttributes() { + throw new NotImplementedException("Add an implementation!"); + } /** * Test for the existence of columns in the table, as specified by the Get. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 907e3d1a7040..3941c0d18540 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -757,6 +757,6 @@ default long getOperationTimeout(TimeUnit unit) { * @return map of request attributes */ default Map getRequestAttributes() { - return Collections.emptyMap(); + throw new NotImplementedException("Add an implementation!"); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 0298942e29fa..669fc73a3bfa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,7 +19,6 @@ import io.opentelemetry.api.trace.Span; import java.io.IOException; -import java.util.Collections; import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -64,13 +63,6 @@ class Call { final Span span; Timeout timeoutTask; - Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, - final Message responseDefaultType, int timeout, int priority, RpcCallback callback, - MetricsConnection.CallStats callStats) { - this(id, md, param, cells, responseDefaultType, timeout, priority, Collections.emptyMap(), - callback, callStats); - } - Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority, Map attributes, RpcCallback callback, MetricsConnection.CallStats callStats) { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java index 7375388e4a04..10948358ff92 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.util.Collections; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; @@ -148,7 +149,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { Address.fromParts("127.0.0.1", server.getLocalPort())); NettyRpcConnection conn = client.createConnection(id); BlockingRpcCallback done = new BlockingRpcCallback<>(); - Call call = new Call(1, null, null, null, null, 0, 0, done, new CallStats()); + Call call = + new Call(1, null, null, null, null, 0, 0, Collections.emptyMap(), done, new CallStats()); HBaseRpcController hrc = new HBaseRpcControllerImpl(); conn.sendRequest(call, hrc); done.get(); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java index b14cd5f2c062..5d0175618cf7 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java @@ -324,7 +324,6 @@ public TableBuilder setWriteRpcTimeout(int timeout) { @Override public TableBuilder setRequestAttributes(Map requestAttributes) { - this.setRequestAttributes(requestAttributes); return this; } From 2be8a9f454d8e2ca85addd980b14f3a844e7c2dc Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Wed, 19 Jul 2023 18:05:57 -0400 Subject: [PATCH 10/12] support setting a single req attribute --- .../apache/hadoop/hbase/client/AsyncTableBuilder.java | 5 +++++ .../hadoop/hbase/client/AsyncTableBuilderBase.java | 9 ++++++++- .../org/apache/hadoop/hbase/client/TableBuilder.java | 5 +++++ .../org/apache/hadoop/hbase/client/TableBuilderBase.java | 9 ++++++++- 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 4bbb5fb8d897..8aae4e6fba0c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -143,6 +143,11 @@ default AsyncTableBuilder setMaxRetries(int maxRetries) { */ AsyncTableBuilder setRequestAttributes(Map requestAttributes); + /** + * Set a request attribute + */ + AsyncTableBuilder setRequestAttribute(String key, byte[] value); + /** * Create the {@link AsyncTable} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index f4cacce900db..12952dd0a564 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -128,7 +128,14 @@ public AsyncTableBuilderBase setStartLogErrorsCnt(int startLogErrorsCnt) { @Override public AsyncTableBuilder setRequestAttributes(Map requestAttributes) { - this.requestAttributes = requestAttributes; + this.requestAttributes = new HashMap<>(requestAttributes.size()); + this.requestAttributes.putAll(requestAttributes); + return this; + } + + @Override + public AsyncTableBuilder setRequestAttribute(String key, byte[] value) { + this.requestAttributes.put(key, value); return this; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java index 2f7c78b83f35..99f182c15e05 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java @@ -61,6 +61,11 @@ public interface TableBuilder { */ TableBuilder setRequestAttributes(Map requestAttributes); + /** + * Set a request attribute + */ + TableBuilder setRequestAttribute(String key, byte[] value); + /** * Create the {@link Table} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java index 69186aabc423..3ba72a127b16 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -80,7 +80,14 @@ public TableBuilderBase setWriteRpcTimeout(int timeout) { @Override public TableBuilderBase setRequestAttributes(Map requestAttributes) { - this.requestAttributes = requestAttributes; + this.requestAttributes = new HashMap<>(requestAttributes.size()); + this.requestAttributes.putAll(requestAttributes); + return this; + } + + @Override + public TableBuilderBase setRequestAttribute(String key, byte[] value) { + this.requestAttributes.put(key, value); return this; } } From e68afb6573fd27c6357968324df07bdfa4c3234a Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Thu, 20 Jul 2023 10:43:11 -0400 Subject: [PATCH 11/12] remove plural attribute setter --- .../hadoop/hbase/client/AsyncTableBuilder.java | 6 ------ .../hadoop/hbase/client/AsyncTableBuilderBase.java | 13 +++++-------- .../hbase/client/ConnectionOverAsyncConnection.java | 8 ++++---- .../apache/hadoop/hbase/client/TableBuilder.java | 6 ------ .../hadoop/hbase/client/TableBuilderBase.java | 13 +++++-------- .../hbase/thrift2/client/ThriftConnection.java | 2 +- 6 files changed, 15 insertions(+), 33 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 8aae4e6fba0c..007f7ad48685 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -19,7 +19,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; -import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseServerException; import org.apache.yetus.audience.InterfaceAudience; @@ -138,11 +137,6 @@ default AsyncTableBuilder setMaxRetries(int maxRetries) { */ AsyncTableBuilder setStartLogErrorsCnt(int startLogErrorsCnt); - /** - * Sets the map of request attributes - */ - AsyncTableBuilder setRequestAttributes(Map requestAttributes); - /** * Set a request attribute */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 12952dd0a564..02e9da0770b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -52,7 +53,7 @@ abstract class AsyncTableBuilderBase protected int startLogErrorsCnt; - protected Map requestAttributes = new HashMap<>(); + protected Map requestAttributes = Collections.emptyMap(); AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) { this.tableName = tableName; @@ -126,15 +127,11 @@ public AsyncTableBuilderBase setStartLogErrorsCnt(int startLogErrorsCnt) { return this; } - @Override - public AsyncTableBuilder setRequestAttributes(Map requestAttributes) { - this.requestAttributes = new HashMap<>(requestAttributes.size()); - this.requestAttributes.putAll(requestAttributes); - return this; - } - @Override public AsyncTableBuilder setRequestAttribute(String key, byte[] value) { + if (this.requestAttributes.isEmpty()) { + this.requestAttributes = new HashMap<>(); + } this.requestAttributes.put(key, value); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index 55d255b88f54..51368fc23c15 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -189,13 +189,13 @@ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { public Table build() { IOExceptionSupplier poolSupplier = pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool; - return new TableOverAsyncTable(conn, + AsyncTableBuilder tableBuilder = conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) - .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS) - .setRequestAttributes(requestAttributes).build(), - poolSupplier); + .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS); + requestAttributes.forEach(tableBuilder::setRequestAttribute); + return new TableOverAsyncTable(conn, tableBuilder.build(), poolSupplier); } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java index 99f182c15e05..eee985555b34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import java.util.Map; import org.apache.yetus.audience.InterfaceAudience; /** @@ -56,11 +55,6 @@ public interface TableBuilder { */ TableBuilder setWriteRpcTimeout(int timeout); - /** - * Sets the map of request attributes - */ - TableBuilder setRequestAttributes(Map requestAttributes); - /** * Set a request attribute */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java index 3ba72a127b16..dc3111b0c79d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.hbase.TableName; @@ -38,7 +39,7 @@ abstract class TableBuilderBase implements TableBuilder { protected int writeRpcTimeout; - protected Map requestAttributes = new HashMap<>(); + protected Map requestAttributes = Collections.emptyMap(); TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) { if (tableName == null) { @@ -78,15 +79,11 @@ public TableBuilderBase setWriteRpcTimeout(int timeout) { return this; } - @Override - public TableBuilderBase setRequestAttributes(Map requestAttributes) { - this.requestAttributes = new HashMap<>(requestAttributes.size()); - this.requestAttributes.putAll(requestAttributes); - return this; - } - @Override public TableBuilderBase setRequestAttribute(String key, byte[] value) { + if (this.requestAttributes.isEmpty()) { + this.requestAttributes = new HashMap<>(); + } this.requestAttributes.put(key, value); return this; } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java index 5d0175618cf7..db1b1e1c9870 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java @@ -323,7 +323,7 @@ public TableBuilder setWriteRpcTimeout(int timeout) { } @Override - public TableBuilder setRequestAttributes(Map requestAttributes) { + public TableBuilder setRequestAttribute(String key, byte[] value) { return this; } From fe17ca383cec654271f83b218ec68dcedd356f1d Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Thu, 20 Jul 2023 11:49:04 -0400 Subject: [PATCH 12/12] fix tests --- .../TestRequestAndConnectionAttributes.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java index b0876501974b..b376bfc18557 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -126,8 +126,8 @@ public void testRequestAttributesGet() throws IOException { try ( Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); - Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) - .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { table.get(new Get(Bytes.toBytes(0))); } @@ -144,8 +144,8 @@ public void testRequestAttributesMultiGet() throws IOException { try ( Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); - Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) - .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { List gets = ImmutableList.of(new Get(Bytes.toBytes(0)), new Get(Bytes.toBytes(1))); table.get(gets); } @@ -162,8 +162,8 @@ public void testRequestAttributesExists() throws IOException { try ( Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); - Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) - .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { table.exists(new Get(Bytes.toBytes(0))); } @@ -180,8 +180,8 @@ public void testRequestAttributesScan() throws IOException { try ( Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); - Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) - .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { ResultScanner scanner = table.getScanner(new Scan()); scanner.next(); } @@ -197,8 +197,8 @@ public void testRequestAttributesPut() throws IOException { try ( Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); - Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) - .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { Put put = new Put(Bytes.toBytes("a")); put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); table.put(put); @@ -215,8 +215,8 @@ public void testRequestAttributesMultiPut() throws IOException { try ( Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); - Table table = conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE) - .setRequestAttributes(REQUEST_ATTRIBUTES).build()) { + Table table = configureRequestAttributes( + conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, EXECUTOR_SERVICE)).build()) { Put put = new Put(Bytes.toBytes("a")); put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), Bytes.toBytes("v")); table.put(put); @@ -251,6 +251,11 @@ private void addRandomRequestAttributes() { } } + private static TableBuilder configureRequestAttributes(TableBuilder tableBuilder) { + REQUEST_ATTRIBUTES.forEach(tableBuilder::setRequestAttribute); + return tableBuilder; + } + public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor { @Override