Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -127,6 +129,11 @@ public class AsyncConnectionImpl implements AsyncConnection {

public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
this(conf, registry, clusterId, localAddress, user, Collections.emptyMap());
}

public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user, Map<String, byte[]> connectionAttributes) {
this.conf = conf;
this.user = user;
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
Expand All @@ -142,8 +149,8 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
} else {
this.metrics = Optional.empty();
}
this.rpcClient =
RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress,
metrics.orElse(null), connectionAttributes);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcTimeout =
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -216,21 +218,53 @@ public static Connection createConnection(Configuration conf, User user) throws
*/
public static Connection createConnection(Configuration conf, ExecutorService pool,
final User user) throws IOException {
return createConnection(conf, pool, user, Collections.emptyMap());
}

/**
* Create a new Connection instance using the passed <code>conf</code> instance. Connection
* encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
* created from returned connection share zookeeper connection, meta cache, and connections to
* region servers and masters. <br>
* The caller is responsible for calling {@link Connection#close()} on the returned connection
* instance. Typical usage:
*
* <pre>
* Connection connection = ConnectionFactory.createConnection(conf);
* Table table = connection.getTable(TableName.valueOf("table1"));
* try {
* table.get(...);
* ...
* } finally {
* table.close();
* connection.close();
* }
* </pre>
*
* @param conf configuration
* @param user the user the connection is for
* @param pool the thread pool to use for batch operations
* @param connectionAttributes attributes to be sent along to server during connection establish
* @return Connection object for <code>conf</code>
*/
public static Connection createConnection(Configuration conf, ExecutorService pool,
final User user, Map<String, byte[]> connectionAttributes) throws IOException {
Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
ConnectionOverAsyncConnection.class, Connection.class);
if (clazz != ConnectionOverAsyncConnection.class) {
try {
// Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor =
clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
ExecutorService.class, User.class, Map.class);
constructor.setAccessible(true);
return user.runAs((PrivilegedExceptionAction<
Connection>) () -> (Connection) constructor.newInstance(conf, pool, user));
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
.newInstance(conf, pool, user, connectionAttributes));
} catch (Exception e) {
throw new IOException(e);
}
} else {
return FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes))
.toConnection();
}
}

Expand Down Expand Up @@ -281,6 +315,27 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configura
*/
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
final User user) {
return createAsyncConnection(conf, user, null);
}

/**
* Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
* AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
* interfaces created from returned connection share zookeeper connection, meta cache, and
* connections to region servers and masters.
* <p>
* The caller is responsible for calling {@link AsyncConnection#close()} on the returned
* connection instance.
* <p>
* Usually you should only create one AsyncConnection instance in your code and use it everywhere
* as it is thread safe.
* @param conf configuration
* @param user the user the asynchronous connection is for
* @param connectionAttributes attributes to be sent along to server during connection establish
* @return AsyncConnection object wrapped by CompletableFuture
*/
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
final User user, Map<String, byte[]> connectionAttributes) {
return TraceUtil.tracedFuture(() -> {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
Expand All @@ -300,7 +355,7 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configura
try {
future.complete(
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
.newInstance(clazz, conf, registry, clusterId, null, user)));
.newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes)));
} catch (Exception e) {
registry.close();
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -106,6 +107,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
private boolean running = true; // if client runs

protected final Configuration conf;
protected final Map<String, byte[]> connectionAttributes;
protected final String clusterId;
protected final SocketAddress localAddr;
protected final MetricsConnection metrics;
Expand Down Expand Up @@ -154,7 +156,7 @@ public AtomicInteger load(Address key) throws Exception {
* @param metrics the connection metrics
*/
public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
MetricsConnection metrics) {
MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
this.userProvider = UserProvider.instantiate(conf);
this.localAddr = localAddr;
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
Expand All @@ -167,6 +169,7 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc

this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
this.conf = conf;
this.connectionAttributes = connectionAttributes;
this.codec = getCodec();
this.compressor = getCompressor(conf);
this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
Expand Down Expand Up @@ -417,7 +420,7 @@ private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcCon

final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
hrc.getCallTimeout(), hrc.getPriority(), hrc.getAttributes(), new RpcCallback<Call>() {
@Override
public void run(Call call) {
try (Scope scope = call.span.makeCurrent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -41,7 +43,7 @@ public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection>
* SocketFactory
*/
BlockingRpcClient(Configuration conf) {
this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null);
this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap());
}

/**
Expand All @@ -53,8 +55,8 @@ public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection>
* @param metrics the connection metrics
*/
public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
MetricsConnection metrics) {
super(conf, clusterId, localAddr, metrics);
MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
super(conf, clusterId, localAddr, metrics, connectionAttributes);
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void cleanup(IOException e) {
BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
rpcClient.metrics);
rpcClient.metrics, rpcClient.connectionAttributes);
this.rpcClient = rpcClient;
this.connectionHeaderPreamble = getConnectionHeaderPreamble();
ConnectionHeader header = getConnectionHeader();
Expand Down
11 changes: 11 additions & 0 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
Expand Down Expand Up @@ -56,6 +58,7 @@ class Call {
final Descriptors.MethodDescriptor md;
final int timeout; // timeout in millisecond for this call; 0 means infinite.
final int priority;
final Map<String, byte[]> attributes;
final MetricsConnection.CallStats callStats;
private final RpcCallback<Call> callback;
final Span span;
Expand All @@ -64,6 +67,13 @@ class Call {
Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells,
final Message responseDefaultType, int timeout, int priority, RpcCallback<Call> callback,
MetricsConnection.CallStats callStats) {
this(id, md, param, cells, responseDefaultType, timeout, priority, Collections.emptyMap(),
callback, callStats);
}

Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells,
final Message responseDefaultType, int timeout, int priority, Map<String, byte[]> attributes,
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
this.param = param;
this.md = md;
this.cells = cells;
Expand All @@ -73,6 +83,7 @@ class Call {
this.id = id;
this.timeout = timeout;
this.priority = priority;
this.attributes = attributes;
this.callback = callback;
this.span = Span.current();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -112,6 +114,11 @@ public boolean hasCallTimeout() {
return delegate.hasCallTimeout();
}

@Override
public Map<String, byte[]> getAttributes() {
return Collections.emptyMap();
}

@Override
public void setFailed(IOException e) {
delegate.setFailed(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
Expand Down Expand Up @@ -71,6 +72,8 @@ public interface HBaseRpcController extends RpcController, CellScannable {

boolean hasCallTimeout();

Map<String, byte[]> getAttributes();

/**
* Set failed with an exception to pass on. For use in async rpc clients
* @param e exception to set with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
Expand Down Expand Up @@ -166,6 +168,11 @@ public boolean hasCallTimeout() {
return callTimeout != null;
}

@Override
public Map<String, byte[]> getAttributes() {
return Collections.emptyMap();
}

@Override
public synchronized String errorText() {
if (!done || exception == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand All @@ -44,10 +45,12 @@
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;

import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
Expand Down Expand Up @@ -126,6 +129,14 @@ static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta)
if (call.priority != HConstants.PRIORITY_UNSET) {
builder.setPriority(call.priority);
}
if (call.attributes != null && !call.attributes.isEmpty()) {
HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder();
for (Map.Entry<String, byte[]> attribute : call.attributes.entrySet()) {
attributeBuilder.setName(attribute.getKey());
attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
builder.addAttribute(attributeBuilder.build());
}
}
builder.setTimeout(call.timeout);

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
Expand Down Expand Up @@ -55,7 +57,12 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {

public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
MetricsConnection metrics) {
super(configuration, clusterId, localAddress, metrics);
this(configuration, clusterId, localAddress, metrics, Collections.emptyMap());
}

public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
super(configuration, clusterId, localAddress, metrics, connectionAttributes);
Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass =
NettyRpcClientConfigHelper.getEventLoopConfig(conf);
if (groupAndChannelClass == null) {
Expand All @@ -75,7 +82,7 @@ public NettyRpcClient(Configuration configuration, String clusterId, SocketAddre

/** Used in test only. */
public NettyRpcClient(Configuration configuration) {
this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class NettyRpcConnection extends RpcConnection {
NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
rpcClient.metrics);
rpcClient.metrics, rpcClient.connectionAttributes);
this.rpcClient = rpcClient;
this.eventLoop = rpcClient.group.next();
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
Expand Down
Loading