Skip to content

Commit 9444ac0

Browse files
author
Ray Mattingly
committed
branch-2 connection and request attributes impl
1 parent 37dec7e commit 9444ac0

File tree

12 files changed

+113
-22
lines changed

12 files changed

+113
-22
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,13 @@ public void waitUntilDone() throws InterruptedIOException {
153153
private final int periodToLog;
154154

155155
AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller,
156-
RpcControllerFactory rpcFactory) {
157-
this(hc, conf, rpcCaller, rpcFactory, hc.getConnectionConfiguration().getRetriesNumber());
156+
RpcControllerFactory rpcFactory, Map<String, byte[]> requestAttributes) {
157+
this(hc, conf, rpcCaller, rpcFactory, hc.getConnectionConfiguration().getRetriesNumber(),
158+
requestAttributes);
158159
}
159160

160161
AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller,
161-
RpcControllerFactory rpcFactory, int retriesNumber) {
162+
RpcControllerFactory rpcFactory, int retriesNumber, Map<String, byte[]> requestAttributes) {
162163
if (hc == null) {
163164
throw new IllegalArgumentException("ClusterConnection cannot be null.");
164165
}
@@ -189,6 +190,7 @@ public void waitUntilDone() throws InterruptedIOException {
189190

190191
this.rpcCallerFactory = rpcCaller;
191192
this.rpcFactory = rpcFactory;
193+
this.rpcFactory.setRequestAttributes(requestAttributes);
192194
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
193195

194196
this.requestController = RequestControllerFactory.create(conf);
@@ -218,6 +220,10 @@ public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task)
218220
}
219221
}
220222

223+
public void setRequestAttributes(Map<String, byte[]> requestAttributes) {
224+
this.rpcFactory.setRequestAttributes(requestAttributes);
225+
}
226+
221227
/**
222228
* Extract from the rows list what we can submit. The rows we can not submit are kept in the list.
223229
* Does not send requests to replicas (not currently used for anything other than streaming puts

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
142142
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
143143
this(conn, params,
144144
// puts need to track errors globally due to how the APIs currently work.
145-
new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory));
145+
// todo rmattingly support buffered mutator request attributes
146+
new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, rpcFactory,
147+
Collections.emptyMap()));
146148
}
147149

148150
private void checkClose() {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,10 @@ public static Connection createConnection(Configuration conf, ExecutorService po
230230
try {
231231
// Default HCM#HCI is not accessible; make it so before invoking.
232232
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
233-
ExecutorService.class, User.class, Map.class);
233+
ExecutorService.class, User.class, ConnectionRegistry.class, Map.class);
234234
constructor.setAccessible(true);
235235
return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
236-
.newInstance(conf, pool, user, connectionAttributes));
236+
.newInstance(conf, pool, user, null, connectionAttributes));
237237
} catch (Exception e) {
238238
throw new IOException(e);
239239
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Collections;
4040
import java.util.Date;
4141
import java.util.List;
42+
import java.util.Map;
4243
import java.util.concurrent.BlockingQueue;
4344
import java.util.concurrent.CompletableFuture;
4445
import java.util.concurrent.ConcurrentHashMap;
@@ -292,6 +293,14 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
292293
*/
293294
ConnectionImplementation(Configuration conf, ExecutorService pool, User user,
294295
ConnectionRegistry registry) throws IOException {
296+
this(conf, pool, user, registry, Collections.emptyMap());
297+
}
298+
299+
/**
300+
* Constructor, for creating cluster connection with provided ConnectionRegistry.
301+
*/
302+
ConnectionImplementation(Configuration conf, ExecutorService pool, User user,
303+
ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
295304
this.conf = conf;
296305
this.user = user;
297306
if (user != null && user.isLoginFromKeytab()) {
@@ -348,11 +357,13 @@ public class ConnectionImplementation implements ClusterConnection, Closeable {
348357
}
349358
this.metaCache = new MetaCache(this.metrics);
350359

351-
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
360+
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics,
361+
connectionAttributes);
352362
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
353363
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, connectionConfig,
354364
interceptor, this.stats, this.metrics);
355-
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
365+
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory,
366+
Collections.emptyMap());
356367

357368
// Do we publish the status?
358369
if (shouldListen) {
@@ -476,7 +487,7 @@ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
476487
@Override
477488
public Table build() {
478489
return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
479-
rpcControllerFactory, pool);
490+
rpcControllerFactory, pool, requestAttributes);
480491
}
481492
};
482493
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.hadoop.hbase.filter.Filter;
6060
import org.apache.hadoop.hbase.io.TimeRange;
6161
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
62+
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
6263
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
6364
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
6465
import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -130,6 +131,8 @@ public class HTable implements Table {
130131
private final RpcRetryingCallerFactory rpcCallerFactory;
131132
private final RpcControllerFactory rpcControllerFactory;
132133

134+
private final Map<String, byte[]> requestAttributes;
135+
133136
// Marked Private @since 1.0
134137
@InterfaceAudience.Private
135138
public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
@@ -165,7 +168,8 @@ public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
165168
@InterfaceAudience.Private
166169
protected HTable(final ConnectionImplementation connection, final TableBuilderBase builder,
167170
final RpcRetryingCallerFactory rpcCallerFactory,
168-
final RpcControllerFactory rpcControllerFactory, final ExecutorService pool) {
171+
final RpcControllerFactory rpcControllerFactory, final ExecutorService pool,
172+
final Map<String, byte[]> requestAttributes) {
169173
this.connection = Preconditions.checkNotNull(connection, "connection is null");
170174
this.configuration = connection.getConfiguration();
171175
this.connConfiguration = connection.getConnectionConfiguration();
@@ -197,9 +201,11 @@ protected HTable(final ConnectionImplementation connection, final TableBuilderBa
197201
this.scanTimeout = builder.scanTimeout;
198202
this.scannerCaching = connConfiguration.getScannerCaching();
199203
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
204+
this.requestAttributes = requestAttributes;
200205

201206
// puts need to track errors globally due to how the APIs currently work.
202207
multiAp = this.connection.getAsyncProcess();
208+
multiAp.setRequestAttributes(requestAttributes);
203209
this.locator = new HRegionLocator(tableName, connection);
204210
}
205211

@@ -1308,7 +1314,7 @@ public <R extends Message> void batchCoprocessorService(
13081314
AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
13091315
RpcRetryingCallerFactory.instantiate(configuration, connConfiguration,
13101316
connection.getStatisticsTracker(), connection.getConnectionMetrics()),
1311-
RpcControllerFactory.instantiate(configuration));
1317+
RpcControllerFactory.instantiate(configuration), requestAttributes);
13121318

13131319
Batch.Callback<ClientProtos.CoprocessorServiceResult> resultsCallback =
13141320
(byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
@@ -1433,6 +1439,11 @@ public boolean thenMutate(RowMutations mutation) throws IOException {
14331439
}
14341440
}
14351441

1442+
@Override
1443+
public Map<String, byte[]> getRequestAttributes() {
1444+
return requestAttributes;
1445+
}
1446+
14361447
private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder {
14371448

14381449
private final byte[] row;
@@ -1478,4 +1489,9 @@ public boolean thenMutate(RowMutations mutation) throws IOException {
14781489
.isSuccess(), supplier);
14791490
}
14801491
}
1492+
1493+
private void maybePopulateRequestAttributes(ClientServiceCallable<?> callable) {
1494+
HBaseRpcControllerImpl.configureRequestAttributes(callable.getRpcController(),
1495+
requestAttributes);
1496+
}
14811497
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,8 @@ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation a
434434
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
435435
// Specify 0 retries in AsyncProcess because we need to reassign puts to different queues
436436
// if regions are moved.
437-
this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory, 0);
437+
this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, rpcControllerFactory, 0,
438+
Collections.emptyMap());
438439
this.executor = executor;
439440
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
440441
this.pool = pool;

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.yetus.audience.InterfaceAudience;
3232

3333
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
34+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
3435

3536
/**
3637
* Get instances via {@link RpcControllerFactory} on client-side.
@@ -281,4 +282,15 @@ public String toString() {
281282
+ exception + ", regionInfo=" + regionInfo + ", priority=" + priority + ", cellScanner="
282283
+ cellScanner + '}';
283284
}
285+
286+
public static void configureRequestAttributes(RpcController rpcController,
287+
Map<String, byte[]> requestAttributes) {
288+
if (
289+
!requestAttributes.isEmpty() && rpcController != null
290+
&& rpcController instanceof HBaseRpcController
291+
) {
292+
HBaseRpcController controller = (HBaseRpcController) rpcController;
293+
controller.setRequestAttributes(requestAttributes);
294+
}
295+
}
284296
}

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import static org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl.configureRequestAttributes;
21+
22+
import java.util.Collections;
2023
import java.util.List;
24+
import java.util.Map;
2125
import org.apache.hadoop.conf.Configuration;
2226
import org.apache.hadoop.hbase.CellScannable;
2327
import org.apache.hadoop.hbase.CellScanner;
@@ -45,30 +49,42 @@ public class RpcControllerFactory {
4549
public static final String CUSTOM_CONTROLLER_CONF_KEY = "hbase.rpc.controllerfactory.class";
4650
protected final Configuration conf;
4751

52+
private Map<String, byte[]> requestAttributes = Collections.emptyMap();
53+
4854
public RpcControllerFactory(Configuration conf) {
4955
this.conf = conf;
5056
}
5157

5258
public HBaseRpcController newController() {
5359
// TODO: Set HConstants default rpc timeout here rather than nothing?
54-
return new HBaseRpcControllerImpl();
60+
HBaseRpcController controller = new HBaseRpcControllerImpl();
61+
configureRequestAttributes(controller, requestAttributes);
62+
return controller;
5563
}
5664

5765
public HBaseRpcController newController(CellScanner cellScanner) {
58-
return new HBaseRpcControllerImpl(null, cellScanner);
66+
HBaseRpcController controller = new HBaseRpcControllerImpl(null, cellScanner);
67+
configureRequestAttributes(controller, requestAttributes);
68+
return controller;
5969
}
6070

6171
public HBaseRpcController newController(RegionInfo regionInfo, CellScanner cellScanner) {
62-
return new HBaseRpcControllerImpl(regionInfo, cellScanner);
72+
HBaseRpcController controller = new HBaseRpcControllerImpl(regionInfo, cellScanner);
73+
configureRequestAttributes(controller, requestAttributes);
74+
return controller;
6375
}
6476

6577
public HBaseRpcController newController(final List<CellScannable> cellIterables) {
66-
return new HBaseRpcControllerImpl(null, cellIterables);
78+
HBaseRpcController controller = new HBaseRpcControllerImpl(null, cellIterables);
79+
configureRequestAttributes(controller, requestAttributes);
80+
return controller;
6781
}
6882

6983
public HBaseRpcController newController(RegionInfo regionInfo,
7084
final List<CellScannable> cellIterables) {
71-
return new HBaseRpcControllerImpl(regionInfo, cellIterables);
85+
HBaseRpcController controller = new HBaseRpcControllerImpl(regionInfo, cellIterables);
86+
configureRequestAttributes(controller, requestAttributes);
87+
return controller;
7288
}
7389

7490
public static RpcControllerFactory instantiate(Configuration configuration) {
@@ -91,4 +107,9 @@ public static RpcControllerFactory instantiate(Configuration configuration) {
91107
return new RpcControllerFactory(configuration);
92108
}
93109
}
110+
111+
public RpcControllerFactory setRequestAttributes(Map<String, byte[]> requestAttributes) {
112+
this.requestAttributes = requestAttributes;
113+
return this;
114+
}
94115
}

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,14 +185,14 @@ public TableName getTableName() {
185185

186186
public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
187187
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
188-
new RpcControllerFactory(conf));
188+
new RpcControllerFactory(conf), Collections.emptyMap());
189189
service = Executors.newFixedThreadPool(5);
190190
this.conf = conf;
191191
}
192192

193193
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
194194
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
195-
new RpcControllerFactory(conf));
195+
new RpcControllerFactory(conf), Collections.emptyMap());
196196
service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
197197
new CountingThreadFactory(nbThreads));
198198
}
@@ -1705,7 +1705,7 @@ public Future submit(Runnable runnable) {
17051705
static class AsyncProcessForThrowableCheck extends AsyncProcess {
17061706
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
17071707
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
1708-
new RpcControllerFactory(conf));
1708+
new RpcControllerFactory(conf), Collections.emptyMap());
17091709
}
17101710
}
17111711

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private static class MyAsyncProcess extends AsyncProcess {
199199

200200
MyAsyncProcess(ClusterConnection hc, Configuration conf) {
201201
super(hc, conf, new RpcRetryingCallerFactory(conf, hc.getConnectionConfiguration()),
202-
new RpcControllerFactory(conf));
202+
new RpcControllerFactory(conf), Collections.emptyMap());
203203
}
204204

205205
public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)

0 commit comments

Comments
 (0)