Skip to content

Commit d34b85b

Browse files
eab148Evie Boland
andcommitted
HubSpot Backport HBASE-28001: Add request attribute support to BufferedMutator (apache#6076)
Co-authored-by: Evie Boland <[email protected]> Signed-off-by: Nick Dimiduk <[email protected]>
1 parent 0dec664 commit d34b85b

File tree

12 files changed

+587
-349
lines changed

12 files changed

+587
-349
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.Closeable;
2121
import java.util.Collections;
2222
import java.util.List;
23+
import java.util.Map;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.TimeUnit;
2526
import org.apache.hadoop.conf.Configuration;
@@ -93,4 +94,11 @@ default CompletableFuture<Void> mutate(Mutation mutation) {
9394
default long getPeriodicalFlushTimeout(TimeUnit unit) {
9495
throw new UnsupportedOperationException("Not implemented");
9596
}
97+
98+
/**
99+
* Returns the rpc request attributes.
100+
*/
101+
default Map<String, byte[]> getRequestAttributes() {
102+
return Collections.emptyMap();
103+
}
96104
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
2121

22+
import java.util.Map;
2223
import java.util.concurrent.TimeUnit;
2324
import org.apache.yetus.audience.InterfaceAudience;
2425

@@ -103,6 +104,16 @@ default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) {
103104
*/
104105
AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);
105106

107+
/**
108+
* Set a rpc request attribute.
109+
*/
110+
AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value);
111+
112+
/**
113+
* Set multiple rpc request attributes.
114+
*/
115+
AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes);
116+
106117
/**
107118
* Create the {@link AsyncBufferedMutator} instance.
108119
*/

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import java.util.Map;
2021
import java.util.concurrent.TimeUnit;
2122
import org.apache.yetus.audience.InterfaceAudience;
2223

@@ -78,6 +79,20 @@ public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
7879
return this;
7980
}
8081

82+
@Override
83+
public AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value) {
84+
tableBuilder.setRequestAttribute(key, value);
85+
return this;
86+
}
87+
88+
@Override
89+
public AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
90+
for (Map.Entry<String, byte[]> requestAttribute : requestAttributes.entrySet()) {
91+
tableBuilder.setRequestAttribute(requestAttribute.getKey(), requestAttribute.getValue());
92+
}
93+
return this;
94+
}
95+
8196
@Override
8297
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
8398
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.Iterator;
2626
import java.util.List;
27+
import java.util.Map;
2728
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.stream.Collectors;
@@ -170,4 +171,9 @@ public long getWriteBufferSize() {
170171
public long getPeriodicalFlushTimeout(TimeUnit unit) {
171172
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
172173
}
174+
175+
@Override
176+
public Map<String, byte[]> getRequestAttributes() {
177+
return table.getRequestAttributes();
178+
}
173179
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;
2323

2424
import com.google.protobuf.RpcChannel;
25+
import java.util.Collections;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.function.Function;
30-
import org.apache.commons.lang3.NotImplementedException;
3131
import org.apache.hadoop.conf.Configuration;
3232
import org.apache.hadoop.hbase.CompareOperator;
3333
import org.apache.hadoop.hbase.TableName;
@@ -117,7 +117,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
117117
* @return a map of request attributes supplied by the client
118118
*/
119119
default Map<String, byte[]> getRequestAttributes() {
120-
throw new NotImplementedException("Add an implementation!");
120+
return Collections.emptyMap();
121121
}
122122

123123
/**

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import java.io.Closeable;
2121
import java.io.IOException;
22+
import java.util.Collections;
2223
import java.util.List;
24+
import java.util.Map;
2325
import org.apache.hadoop.conf.Configuration;
2426
import org.apache.hadoop.hbase.TableName;
2527
import org.apache.yetus.audience.InterfaceAudience;
@@ -194,6 +196,13 @@ default void setOperationTimeout(int timeout) {
194196
"The BufferedMutator::setOperationTimeout has not been implemented");
195197
}
196198

199+
/**
200+
* Returns the rpc request attributes.
201+
*/
202+
default Map<String, byte[]> getRequestAttributes() {
203+
return Collections.emptyMap();
204+
}
205+
197206
/**
198207
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
199208
*/

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Collections;
2727
import java.util.Iterator;
2828
import java.util.List;
29+
import java.util.Map;
2930
import java.util.NoSuchElementException;
3031
import java.util.Timer;
3132
import java.util.TimerTask;
@@ -89,6 +90,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
8990
private final ExecutorService pool;
9091
private final AtomicInteger rpcTimeout;
9192
private final AtomicInteger operationTimeout;
93+
private final Map<String, byte[]> requestAttributes;
9294
private final boolean cleanupPoolOnClose;
9395
private volatile boolean closed = false;
9496
private final AsyncProcess ap;
@@ -135,6 +137,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
135137
this.operationTimeout = new AtomicInteger(params.getOperationTimeout() != UNSET
136138
? params.getOperationTimeout()
137139
: conn.getConnectionConfiguration().getOperationTimeout());
140+
141+
this.requestAttributes = params.getRequestAttributes();
142+
138143
this.ap = ap;
139144
}
140145

@@ -252,7 +257,8 @@ public synchronized void close() throws IOException {
252257

253258
private AsyncProcessTask createTask(QueueRowAccess access) {
254259
return new AsyncProcessTask(AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
255-
.setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE).build()) {
260+
.setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
261+
.setRequestAttributes(requestAttributes).build()) {
256262
@Override
257263
public int getRpcTimeout() {
258264
return rpcTimeout.get();
@@ -391,6 +397,11 @@ public void setOperationTimeout(int operationTimeout) {
391397
this.operationTimeout.set(operationTimeout);
392398
}
393399

400+
@Override
401+
public Map<String, byte[]> getRequestAttributes() {
402+
return requestAttributes;
403+
}
404+
394405
long getCurrentWriteBufferSize() {
395406
return currentWriteBufferSize.get();
396407
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.Map;
2023
import java.util.concurrent.ExecutorService;
2124
import org.apache.hadoop.hbase.TableName;
2225
import org.apache.yetus.audience.InterfaceAudience;
@@ -38,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable {
3841
private String implementationClassName = null;
3942
private int rpcTimeout = UNSET;
4043
private int operationTimeout = UNSET;
44+
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
4145
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
4246
@Override
4347
public void onException(RetriesExhaustedWithDetailsException exception,
@@ -85,6 +89,18 @@ public int getOperationTimeout() {
8589
return operationTimeout;
8690
}
8791

92+
public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
93+
if (requestAttributes.isEmpty()) {
94+
requestAttributes = new HashMap<>();
95+
}
96+
requestAttributes.put(key, value);
97+
return this;
98+
}
99+
100+
public Map<String, byte[]> getRequestAttributes() {
101+
return requestAttributes;
102+
}
103+
88104
/**
89105
* Override the write buffer size specified by the provided {@link Connection}'s
90106
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key

hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22+
import java.util.HashMap;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.concurrent.Callable;
2426
import java.util.concurrent.ExecutionException;
2527
import java.util.concurrent.ExecutorService;
2628
import java.util.concurrent.Executors;
2729
import java.util.concurrent.Future;
2830
import java.util.concurrent.TimeUnit;
2931
import java.util.concurrent.TimeoutException;
32+
import org.apache.hadoop.conf.Configuration;
3033
import org.apache.hadoop.conf.Configured;
34+
import org.apache.hadoop.hbase.AuthUtil;
3135
import org.apache.hadoop.hbase.TableName;
3236
import org.apache.hadoop.hbase.client.BufferedMutator;
3337
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
@@ -67,12 +71,18 @@ public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator
6771
}
6872
}
6973
};
70-
BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener);
74+
BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener)
75+
.setRequestAttribute("requestInfo", Bytes.toBytes("bar"));
7176

7277
//
7378
// step 1: create a single Connection and a BufferedMutator, shared by all worker threads.
7479
//
75-
try (final Connection conn = ConnectionFactory.createConnection(getConf());
80+
Map<String, byte[]> connectionAttributes = new HashMap<>();
81+
connectionAttributes.put("clientId", Bytes.toBytes("foo"));
82+
Configuration conf = getConf();
83+
try (
84+
final Connection conn = ConnectionFactory.createConnection(conf, null,
85+
AuthUtil.loginClient(conf), connectionAttributes);
7686
final BufferedMutator mutator = conn.getBufferedMutator(params)) {
7787

7888
/** worker pool that operates on BufferedTable instances */
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
import java.io.IOException;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Optional;
27+
import java.util.Random;
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.hbase.AuthUtil;
30+
import org.apache.hadoop.hbase.Cell;
31+
import org.apache.hadoop.hbase.CellComparator;
32+
import org.apache.hadoop.hbase.HBaseClassTestRule;
33+
import org.apache.hadoop.hbase.HBaseTestingUtility;
34+
import org.apache.hadoop.hbase.HConstants;
35+
import org.apache.hadoop.hbase.MiniHBaseCluster;
36+
import org.apache.hadoop.hbase.TableName;
37+
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
38+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
39+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
40+
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
41+
import org.apache.hadoop.hbase.ipc.RpcCall;
42+
import org.apache.hadoop.hbase.ipc.RpcServer;
43+
import org.apache.hadoop.hbase.testclassification.ClientTests;
44+
import org.apache.hadoop.hbase.testclassification.MediumTests;
45+
import org.apache.hadoop.hbase.util.Bytes;
46+
import org.junit.AfterClass;
47+
import org.junit.BeforeClass;
48+
import org.junit.ClassRule;
49+
import org.junit.Test;
50+
import org.junit.experimental.categories.Category;
51+
52+
@Category({ ClientTests.class, MediumTests.class })
53+
public class TestConnectionAttributes {
54+
55+
@ClassRule
56+
public static final HBaseClassTestRule CLASS_RULE =
57+
HBaseClassTestRule.forClass(TestConnectionAttributes.class);
58+
59+
private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>();
60+
static {
61+
CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo"));
62+
}
63+
private static final byte[] FAMILY = Bytes.toBytes("0");
64+
private static final TableName TABLE_NAME = TableName.valueOf("testConnectionAttributes");
65+
66+
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67+
private static MiniHBaseCluster cluster;
68+
69+
@BeforeClass
70+
public static void setUp() throws Exception {
71+
cluster = TEST_UTIL.startMiniCluster(1);
72+
Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
73+
HConstants.DEFAULT_BLOCKSIZE, TestConnectionAttributes.AttributesCoprocessor.class.getName());
74+
table.close();
75+
}
76+
77+
@AfterClass
78+
public static void afterClass() throws Exception {
79+
cluster.close();
80+
TEST_UTIL.shutdownMiniCluster();
81+
}
82+
83+
@Test
84+
public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException {
85+
Configuration conf = TEST_UTIL.getConfiguration();
86+
try (Connection conn = ConnectionFactory.createConnection(conf, null,
87+
AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(TABLE_NAME)) {
88+
89+
// submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection
90+
// header
91+
byte[] bytes = new byte[300];
92+
new Random().nextBytes(bytes);
93+
Result result = table.get(new Get(bytes));
94+
95+
assertEquals(CONNECTION_ATTRIBUTES.size(), result.size());
96+
for (Map.Entry<String, byte[]> attr : CONNECTION_ATTRIBUTES.entrySet()) {
97+
byte[] val = result.getValue(FAMILY, Bytes.toBytes(attr.getKey()));
98+
assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val));
99+
}
100+
}
101+
}
102+
103+
public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor {
104+
105+
@Override
106+
public Optional<RegionObserver> getRegionObserver() {
107+
return Optional.of(this);
108+
}
109+
110+
@Override
111+
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
112+
List<Cell> result) throws IOException {
113+
RpcCall rpcCall = RpcServer.getCurrentCall().get();
114+
for (Map.Entry<String, byte[]> attr : rpcCall.getConnectionAttributes().entrySet()) {
115+
result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
116+
.setFamily(FAMILY).setQualifier(Bytes.toBytes(attr.getKey())).setValue(attr.getValue())
117+
.setType(Cell.Type.Put).setTimestamp(1).build());
118+
}
119+
result.sort(CellComparator.getInstance());
120+
c.bypass();
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)