Skip to content

Commit b19ee00

Browse files
eab148Evie Boland
andauthored
HBASE-28001: Add request attribute support to BufferedMutator (#6076)
Co-authored-by: Evie Boland <[email protected]> Signed-off-by: Nick Dimiduk <[email protected]>
1 parent 3caaf2d commit b19ee00

File tree

16 files changed

+581
-334
lines changed

16 files changed

+581
-334
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.Closeable;
2121
import java.util.Collections;
2222
import java.util.List;
23+
import java.util.Map;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.TimeUnit;
2526
import org.apache.hadoop.conf.Configuration;
@@ -93,4 +94,11 @@ default CompletableFuture<Void> mutate(Mutation mutation) {
9394
default long getPeriodicalFlushTimeout(TimeUnit unit) {
9495
throw new UnsupportedOperationException("Not implemented");
9596
}
97+
98+
/**
99+
* Returns the rpc request attributes.
100+
*/
101+
default Map<String, byte[]> getRequestAttributes() {
102+
return Collections.emptyMap();
103+
}
96104
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
2121

22+
import java.util.Map;
2223
import java.util.concurrent.TimeUnit;
2324
import org.apache.yetus.audience.InterfaceAudience;
2425

@@ -38,6 +39,16 @@ public interface AsyncBufferedMutatorBuilder {
3839
*/
3940
AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit);
4041

42+
/**
43+
* Set a rpc request attribute.
44+
*/
45+
AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value);
46+
47+
/**
48+
* Set multiple rpc request attributes.
49+
*/
50+
AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes);
51+
4152
/**
4253
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
4354
* retrying.

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import java.util.Map;
2021
import java.util.concurrent.TimeUnit;
2122
import org.apache.yetus.audience.InterfaceAudience;
2223

@@ -78,6 +79,20 @@ public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
7879
return this;
7980
}
8081

82+
@Override
83+
public AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value) {
84+
tableBuilder.setRequestAttribute(key, value);
85+
return this;
86+
}
87+
88+
@Override
89+
public AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
90+
for (Map.Entry<String, byte[]> requestAttribute : requestAttributes.entrySet()) {
91+
tableBuilder.setRequestAttribute(requestAttribute.getKey(), requestAttribute.getValue());
92+
}
93+
return this;
94+
}
95+
8196
@Override
8297
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
8398
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.Iterator;
2626
import java.util.List;
27+
import java.util.Map;
2728
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.stream.Collectors;
@@ -130,7 +131,7 @@ Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutation
130131
periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
131132
synchronized (AsyncBufferedMutatorImpl.this) {
132133
// confirm that we are still valid, if there is already an internalFlush call before us,
133-
// then we should not execute any more. And in internalFlush we will set periodicFlush
134+
// then we should not execute anymore. And in internalFlush we will set periodicFlush
134135
// to null, and since we may schedule a new one, so here we check whether the references
135136
// are equal.
136137
if (timeout == periodicFlushTask) {
@@ -170,4 +171,9 @@ public long getWriteBufferSize() {
170171
public long getPeriodicalFlushTimeout(TimeUnit unit) {
171172
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
172173
}
174+
175+
@Override
176+
public Map<String, byte[]> getRequestAttributes() {
177+
return table.getRequestAttributes();
178+
}
173179
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
2222
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;
2323

24+
import java.util.Collections;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.TimeUnit;
2829
import java.util.function.Function;
29-
import org.apache.commons.lang3.NotImplementedException;
3030
import org.apache.hadoop.conf.Configuration;
3131
import org.apache.hadoop.hbase.CompareOperator;
3232
import org.apache.hadoop.hbase.TableName;
@@ -117,7 +117,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
117117
* @return a map of request attributes supplied by the client
118118
*/
119119
default Map<String, byte[]> getRequestAttributes() {
120-
throw new NotImplementedException("Add an implementation!");
120+
return Collections.emptyMap();
121121
}
122122

123123
/**

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,10 @@ public AsyncTableBuilderBase<C> setStartLogErrorsCnt(int startLogErrorsCnt) {
129129

130130
@Override
131131
public AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value) {
132-
if (this.requestAttributes.isEmpty()) {
133-
this.requestAttributes = new HashMap<>();
132+
if (requestAttributes.isEmpty()) {
133+
requestAttributes = new HashMap<>();
134134
}
135-
this.requestAttributes.put(key, value);
135+
requestAttributes.put(key, value);
136136
return this;
137137
}
138138
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
import java.io.Closeable;
2121
import java.io.IOException;
22+
import java.util.Collections;
2223
import java.util.List;
24+
import java.util.Map;
2325
import org.apache.hadoop.conf.Configuration;
2426
import org.apache.hadoop.hbase.TableName;
2527
import org.apache.yetus.audience.InterfaceAudience;
@@ -204,6 +206,13 @@ default void setOperationTimeout(int timeout) {
204206
"The BufferedMutator::setOperationTimeout has not been implemented");
205207
}
206208

209+
/**
210+
* Returns the rpc request attributes.
211+
*/
212+
default Map<String, byte[]> getRequestAttributes() {
213+
return Collections.emptyMap();
214+
}
215+
207216
/**
208217
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
209218
*/

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.List;
26+
import java.util.Map;
2627
import java.util.Set;
2728
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.CompletionException;
@@ -186,4 +187,9 @@ public void setRpcTimeout(int timeout) {
186187
public void setOperationTimeout(int timeout) {
187188
// no effect
188189
}
190+
191+
@Override
192+
public Map<String, byte[]> getRequestAttributes() {
193+
return mutator.getRequestAttributes();
194+
}
189195
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.Map;
2023
import java.util.concurrent.ExecutorService;
2124
import org.apache.hadoop.hbase.TableName;
2225
import org.apache.yetus.audience.InterfaceAudience;
@@ -38,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable {
3841
private String implementationClassName = null;
3942
private int rpcTimeout = UNSET;
4043
private int operationTimeout = UNSET;
44+
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
4145
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
4246
@Override
4347
public void onException(RetriesExhaustedWithDetailsException exception,
@@ -85,6 +89,18 @@ public int getOperationTimeout() {
8589
return operationTimeout;
8690
}
8791

92+
public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
93+
if (requestAttributes.isEmpty()) {
94+
requestAttributes = new HashMap<>();
95+
}
96+
requestAttributes.put(key, value);
97+
return this;
98+
}
99+
100+
public Map<String, byte[]> getRequestAttributes() {
101+
return requestAttributes;
102+
}
103+
88104
/**
89105
* Override the write buffer size specified by the provided {@link Connection}'s
90106
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I
107107
if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
108108
builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
109109
}
110+
if (!params.getRequestAttributes().isEmpty()) {
111+
112+
builder.setRequestAttributes(params.getRequestAttributes());
113+
}
110114
return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
111115
}
112116

0 commit comments

Comments
 (0)