Skip to content

Commit eceef19

Browse files
rmdmattinglyRay Mattingly
authored andcommitted
Connection and Request Attributes
1 parent 3df01ea commit eceef19

File tree

28 files changed

+398
-38
lines changed

28 files changed

+398
-38
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.opentelemetry.api.trace.Span;
3232
import java.io.IOException;
3333
import java.net.SocketAddress;
34+
import java.util.Map;
3435
import java.util.Optional;
3536
import java.util.concurrent.CompletableFuture;
3637
import java.util.concurrent.ConcurrentHashMap;
@@ -127,6 +128,11 @@ public class AsyncConnectionImpl implements AsyncConnection {
127128

128129
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
129130
SocketAddress localAddress, User user) {
131+
this(conf, registry, clusterId, localAddress, user, null);
132+
}
133+
134+
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
135+
SocketAddress localAddress, User user, Map<String, byte[]> connectionAttributes) {
130136
this.conf = conf;
131137
this.user = user;
132138
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
@@ -142,8 +148,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
142148
} else {
143149
this.metrics = Optional.empty();
144150
}
145-
this.rpcClient =
146-
RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
151+
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress,
152+
metrics.orElse(null), connectionAttributes);
147153
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
148154
this.rpcTimeout =
149155
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.lang.reflect.Constructor;
2424
import java.security.PrivilegedExceptionAction;
25+
import java.util.Map;
2526
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.ExecutorService;
2728
import org.apache.hadoop.conf.Configuration;
@@ -216,21 +217,53 @@ public static Connection createConnection(Configuration conf, User user) throws
216217
*/
217218
public static Connection createConnection(Configuration conf, ExecutorService pool,
218219
final User user) throws IOException {
220+
return createConnection(conf, pool, user);
221+
}
222+
223+
/**
224+
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
225+
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
226+
* created from returned connection share zookeeper connection, meta cache, and connections to
227+
* region servers and masters. <br>
228+
* The caller is responsible for calling {@link Connection#close()} on the returned connection
229+
* instance. Typical usage:
230+
*
231+
* <pre>
232+
* Connection connection = ConnectionFactory.createConnection(conf);
233+
* Table table = connection.getTable(TableName.valueOf("table1"));
234+
* try {
235+
* table.get(...);
236+
* ...
237+
* } finally {
238+
* table.close();
239+
* connection.close();
240+
* }
241+
* </pre>
242+
*
243+
* @param conf configuration
244+
* @param user the user the connection is for
245+
* @param pool the thread pool to use for batch operations
246+
* @param connectionAttributes attributes to be sent along to server during connection establish
247+
* @return Connection object for <code>conf</code>
248+
*/
249+
public static Connection createConnection(Configuration conf, ExecutorService pool,
250+
final User user, Map<String, byte[]> connectionAttributes) throws IOException {
219251
Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
220252
ConnectionOverAsyncConnection.class, Connection.class);
221253
if (clazz != ConnectionOverAsyncConnection.class) {
222254
try {
223255
// Default HCM#HCI is not accessible; make it so before invoking.
224-
Constructor<?> constructor =
225-
clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
256+
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
257+
ExecutorService.class, User.class, Map.class);
226258
constructor.setAccessible(true);
227-
return user.runAs((PrivilegedExceptionAction<
228-
Connection>) () -> (Connection) constructor.newInstance(conf, pool, user));
259+
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
260+
.newInstance(conf, pool, user, connectionAttributes));
229261
} catch (Exception e) {
230262
throw new IOException(e);
231263
}
232264
} else {
233-
return FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
265+
return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes))
266+
.toConnection();
234267
}
235268
}
236269

@@ -281,6 +314,27 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configura
281314
*/
282315
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
283316
final User user) {
317+
return createAsyncConnection(conf, user, null);
318+
}
319+
320+
/**
321+
* Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
322+
* AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
323+
* interfaces created from returned connection share zookeeper connection, meta cache, and
324+
* connections to region servers and masters.
325+
* <p>
326+
* The caller is responsible for calling {@link AsyncConnection#close()} on the returned
327+
* connection instance.
328+
* <p>
329+
* Usually you should only create one AsyncConnection instance in your code and use it everywhere
330+
* as it is thread safe.
331+
* @param conf configuration
332+
* @param user the user the asynchronous connection is for
333+
* @param connectionAttributes attributes to be sent along to server during connection establish
334+
* @return AsyncConnection object wrapped by CompletableFuture
335+
*/
336+
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
337+
final User user, Map<String, byte[]> connectionAttributes) {
284338
return TraceUtil.tracedFuture(() -> {
285339
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
286340
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
@@ -300,7 +354,7 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configura
300354
try {
301355
future.complete(
302356
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
303-
.newInstance(clazz, conf, registry, clusterId, null, user)));
357+
.newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes)));
304358
} catch (Exception e) {
305359
registry.close();
306360
future.completeExceptionally(e);

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.IOException;
2727
import java.net.SocketAddress;
2828
import java.util.Collection;
29+
import java.util.Map;
2930
import java.util.concurrent.Executors;
3031
import java.util.concurrent.ScheduledExecutorService;
3132
import java.util.concurrent.ScheduledFuture;
@@ -106,6 +107,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
106107
private boolean running = true; // if client runs
107108

108109
protected final Configuration conf;
110+
protected final Map<String, byte[]> connectionAttributes;
109111
protected final String clusterId;
110112
protected final SocketAddress localAddr;
111113
protected final MetricsConnection metrics;
@@ -154,7 +156,7 @@ public AtomicInteger load(Address key) throws Exception {
154156
* @param metrics the connection metrics
155157
*/
156158
public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
157-
MetricsConnection metrics) {
159+
MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
158160
this.userProvider = UserProvider.instantiate(conf);
159161
this.localAddr = localAddr;
160162
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
@@ -167,6 +169,7 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc
167169

168170
this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
169171
this.conf = conf;
172+
this.connectionAttributes = connectionAttributes;
170173
this.codec = getCodec();
171174
this.compressor = getCompressor(conf);
172175
this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
@@ -417,7 +420,7 @@ private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcCon
417420

418421
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
419422
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
420-
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
423+
hrc.getCallTimeout(), hrc.getPriority(), hrc.getAttributes(), new RpcCallback<Call>() {
421424
@Override
422425
public void run(Call call) {
423426
try (Scope scope = call.span.makeCurrent()) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.net.SocketAddress;
22+
import java.util.Map;
2223
import javax.net.SocketFactory;
2324
import org.apache.hadoop.conf.Configuration;
2425
import org.apache.hadoop.hbase.HConstants;
@@ -41,7 +42,7 @@ public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection>
4142
* SocketFactory
4243
*/
4344
BlockingRpcClient(Configuration conf) {
44-
this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null);
45+
this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null, null);
4546
}
4647

4748
/**
@@ -53,8 +54,8 @@ public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection>
5354
* @param metrics the connection metrics
5455
*/
5556
public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
56-
MetricsConnection metrics) {
57-
super(conf, clusterId, localAddr, metrics);
57+
MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
58+
super(conf, clusterId, localAddr, metrics, connectionAttributes);
5859
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
5960
}
6061

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void cleanup(IOException e) {
219219
BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
220220
super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
221221
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
222-
rpcClient.metrics);
222+
rpcClient.metrics, rpcClient.connectionAttributes);
223223
this.rpcClient = rpcClient;
224224
this.connectionHeaderPreamble = getConnectionHeaderPreamble();
225225
ConnectionHeader header = getConnectionHeader();

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import io.opentelemetry.api.trace.Span;
2121
import java.io.IOException;
22+
import java.util.Map;
2223
import java.util.Optional;
2324
import org.apache.commons.lang3.builder.ToStringBuilder;
2425
import org.apache.commons.lang3.builder.ToStringStyle;
@@ -56,14 +57,15 @@ class Call {
5657
final Descriptors.MethodDescriptor md;
5758
final int timeout; // timeout in millisecond for this call; 0 means infinite.
5859
final int priority;
60+
final Map<String, byte[]> attributes;
5961
final MetricsConnection.CallStats callStats;
6062
private final RpcCallback<Call> callback;
6163
final Span span;
6264
Timeout timeoutTask;
6365

6466
Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells,
65-
final Message responseDefaultType, int timeout, int priority, RpcCallback<Call> callback,
66-
MetricsConnection.CallStats callStats) {
67+
final Message responseDefaultType, int timeout, int priority, Map<String, byte[]> attributes,
68+
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
6769
this.param = param;
6870
this.md = md;
6971
this.cells = cells;
@@ -73,6 +75,7 @@ class Call {
7375
this.id = id;
7476
this.timeout = timeout;
7577
this.priority = priority;
78+
this.attributes = attributes;
7679
this.callback = callback;
7780
this.span = Span.current();
7881
}

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import java.io.IOException;
21+
import java.util.Map;
2122
import org.apache.hadoop.hbase.CellScanner;
2223
import org.apache.hadoop.hbase.TableName;
2324
import org.apache.yetus.audience.InterfaceAudience;
@@ -112,6 +113,11 @@ public boolean hasCallTimeout() {
112113
return delegate.hasCallTimeout();
113114
}
114115

116+
@Override
117+
public Map<String, byte[]> getAttributes() {
118+
return null;
119+
}
120+
115121
@Override
116122
public void setFailed(IOException e) {
117123
delegate.setFailed(e);

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import java.io.IOException;
21+
import java.util.Map;
2122
import org.apache.hadoop.hbase.CellScannable;
2223
import org.apache.hadoop.hbase.CellScanner;
2324
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -71,6 +72,8 @@ public interface HBaseRpcController extends RpcController, CellScannable {
7172

7273
boolean hasCallTimeout();
7374

75+
Map<String, byte[]> getAttributes();
76+
7477
/**
7578
* Set failed with an exception to pass on. For use in async rpc clients
7679
* @param e exception to set with

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.Map;
2324
import org.apache.hadoop.hbase.CellScannable;
2425
import org.apache.hadoop.hbase.CellScanner;
2526
import org.apache.hadoop.hbase.CellUtil;
@@ -166,6 +167,11 @@ public boolean hasCallTimeout() {
166167
return callTimeout != null;
167168
}
168169

170+
@Override
171+
public Map<String, byte[]> getAttributes() {
172+
return null;
173+
}
174+
169175
@Override
170176
public synchronized String errorText() {
171177
if (!done || exception == null) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.net.ConnectException;
2626
import java.net.SocketTimeoutException;
2727
import java.nio.channels.ClosedChannelException;
28+
import java.util.Map;
2829
import java.util.concurrent.TimeoutException;
2930
import org.apache.commons.lang3.mutable.MutableInt;
3031
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -44,10 +45,12 @@
4445
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
4546
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
4647
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
48+
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
4749
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
4850
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
4951
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
5052

53+
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
5154
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
5255
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
5356
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@@ -126,6 +129,14 @@ static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta)
126129
if (call.priority != HConstants.PRIORITY_UNSET) {
127130
builder.setPriority(call.priority);
128131
}
132+
if (call.attributes != null && !call.attributes.isEmpty()) {
133+
HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder();
134+
for (Map.Entry<String, byte[]> attribute : call.attributes.entrySet()) {
135+
attributeBuilder.setName(attribute.getKey());
136+
attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
137+
builder.addAttribute(attributeBuilder.build());
138+
}
139+
}
129140
builder.setTimeout(call.timeout);
130141

131142
return builder.build();

0 commit comments

Comments
 (0)