Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -93,4 +94,11 @@ default CompletableFuture<Void> mutate(Mutation mutation) {
default long getPeriodicalFlushTimeout(TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Returns the rpc request attributes.
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -38,6 +39,16 @@ public interface AsyncBufferedMutatorBuilder {
*/
AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit);

/**
* Set a rpc request attribute.
*/
AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect this to override all existing requestAttributes and replace the collection with this value? Or, do we expect this method to add an additional requestAttribute to an existing set?

Take a look at the breadth of API exposed on Immutable.Builder implementations around collection objects -- https://immutables.github.io/immutable.html#array-collection-and-map-attributes I'm not saying that we need all of these, but at least consider which semantics we want to support and why.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, or maybe you're hamstrung by the apis that TableBuilder exposes? Maybe we should look at expanding the scope of these methods in a separate JIRA.

Copy link
Contributor Author

@eab148 eab148 Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, or maybe you're hamstrung by the apis that TableBuilder exposes?

Yeah, currently the AsyncBufferedMutatorBuilderImpl wraps an AsyncTableBuilder (code).

Maybe we should look at expanding the scope of these methods in a separate JIRA.

I think this is a good idea. When I drafted this PR, I had a difficult time understanding what semantics we support and why we chose them.


/**
* Set multiple rpc request attributes.
*/
AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes);

/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -78,6 +79,20 @@ public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
return this;
}

@Override
public AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value) {
tableBuilder.setRequestAttribute(key, value);
return this;
}

@Override
public AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
for (Map.Entry<String, byte[]> requestAttribute : requestAttributes.entrySet()) {
tableBuilder.setRequestAttribute(requestAttribute.getKey(), requestAttribute.getValue());
}
return this;
}

@Override
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -130,7 +131,7 @@ Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutation
periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
synchronized (AsyncBufferedMutatorImpl.this) {
// confirm that we are still valid, if there is already an internalFlush call before us,
// then we should not execute any more. And in internalFlush we will set periodicFlush
// then we should not execute anymore. And in internalFlush we will set periodicFlush
// to null, and since we may schedule a new one, so here we check whether the references
// are equal.
if (timeout == periodicFlushTask) {
Expand Down Expand Up @@ -170,4 +171,9 @@ public long getWriteBufferSize() {
public long getPeriodicalFlushTimeout(TimeUnit unit) {
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return table.getRequestAttributes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -117,7 +117,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* @return a map of request attributes supplied by the client
*/
default Map<String, byte[]> getRequestAttributes() {
throw new NotImplementedException("Add an implementation!");
return Collections.emptyMap();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ public AsyncTableBuilderBase<C> setStartLogErrorsCnt(int startLogErrorsCnt) {

@Override
public AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value) {
if (this.requestAttributes.isEmpty()) {
this.requestAttributes = new HashMap<>();
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
}
this.requestAttributes.put(key, value);
requestAttributes.put(key, value);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -204,6 +206,13 @@ default void setOperationTimeout(int timeout) {
"The BufferedMutator::setOperationTimeout has not been implemented");
}

/**
* Returns the rpc request attributes.
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}

/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -186,4 +187,9 @@ public void setRpcTimeout(int timeout) {
public void setOperationTimeout(int timeout) {
// no effect
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return mutator.getRequestAttributes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -38,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable {
private String implementationClassName = null;
private int rpcTimeout = UNSET;
private int operationTimeout = UNSET;
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
Expand Down Expand Up @@ -85,6 +89,18 @@ public int getOperationTimeout() {
return operationTimeout;
}

public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
}
requestAttributes.put(key, value);
return this;
}

public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
}

/**
* Override the write buffer size specified by the provided {@link Connection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I
if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
}
if (!params.getRequestAttributes().isEmpty()) {

builder.setRequestAttributes(params.getRequestAttributes());
}
return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, lo
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).setRequestAttributes(requestAttributes)
.startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes);
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes);
}

private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,6 @@ default long getOperationTimeout(TimeUnit unit) {
* @return map of request attributes
*/
default Map<String, byte[]> getRequestAttributes() {
throw new NotImplementedException("Add an implementation!");
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
Expand Down Expand Up @@ -67,12 +71,19 @@ public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator
}
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener);

BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener)
.setRequestAttribute("requestInfo", Bytes.toBytes("bar"));

//
// step 1: create a single Connection and a BufferedMutator, shared by all worker threads.
//
try (final Connection conn = ConnectionFactory.createConnection(getConf());
Map<String, byte[]> connectionAttributes = new HashMap<>();
connectionAttributes.put("clientId", Bytes.toBytes("foo"));
Configuration conf = getConf();
try (
final Connection conn = ConnectionFactory.createConnection(conf, null,
AuthUtil.loginClient(conf), connectionAttributes);
final BufferedMutator mutator = conn.getBufferedMutator(params)) {

/** worker pool that operates on BufferedTable instances */
Expand Down Expand Up @@ -104,6 +115,7 @@ public Void call() throws Exception {
f.get(5, TimeUnit.MINUTES);
}
workerPool.shutdown();
mutator.flush();
} catch (IOException e) {
// exception while creating/destroying Connection or BufferedMutator
LOG.info("exception while creating/destroying Connection or BufferedMutator", e);
Expand Down
Loading