Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,40 @@ public interface AsyncAdminBuilder {
* Set timeout for a whole admin operation. Operation timeout and max attempt times(or max retry
* times) are both limitations for retrying, we will stop retrying when we reach any of the
* limitations.
* @param timeout
* @param unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to add some javadoc to parameters here instead of removing? I mean after all we are changing this part, let's improve it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have javadoc for these parameters in both AsyncTableBuilder and TableBuilder, and I think the parameter name is good enough to tell users the meaning?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will generate checkstyle error by default although not a big deal.

* @return this for invocation chaining
*/
AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit);

/**
* Set timeout for each rpc request.
* @param timeout
* @param unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

* @return this for invocation chaining
*/
AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit);

/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
* @param timeout
* @param unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

* @return this for invocation chaining
* @see #setRetryPauseForCQTBE(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);

/**
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
* exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
* means the server is overloaded. We just use the normal pause value for
* {@code CallQueueTooBigException} if here you specify a smaller value.
* @see #setRetryPause(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);

/**
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations.
* @param maxRetries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

* @return this for invocation chaining
*/
default AsyncAdminBuilder setMaxRetries(int maxRetries) {
Expand All @@ -74,14 +80,12 @@ default AsyncAdminBuilder setMaxRetries(int maxRetries) {
* Set the max attempt times for an admin operation. Usually it is the max retry times plus 1.
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
* we will stop retrying when we reach any of the limitations.
* @param maxAttempts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

* @return this for invocation chaining
*/
AsyncAdminBuilder setMaxAttempts(int maxAttempts);

/**
* Set the number of retries that are allowed before we start to log.
* @param startLogErrorsCnt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

* @return this for invocation chaining
*/
AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {

protected long pauseNs;

protected long pauseForCQTBENs;

protected int maxAttempts;

protected int startLogErrorsCnt;
Expand All @@ -41,6 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.maxAttempts = connConf.getMaxRetries();
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
Expand All @@ -63,6 +66,12 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) {
return this;
}

@Override
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(timeout);
return this;
}

@Override
public AsyncAdminBuilder setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public interface Callable<T> {
private ServerName serverName;

public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -103,6 +104,8 @@ class AsyncBatchRpcRetryingCaller<T> {

private final long pauseNs;

private final long pauseForCQTBENs;

private final int maxAttempts;

private final long operationTimeoutNs;
Expand Down Expand Up @@ -147,17 +150,17 @@ public int getPriority() {
}

public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;

this.actions = new ArrayList<>(actions.size());
this.futures = new ArrayList<>(actions.size());
this.action2Future = new IdentityHashMap<>(actions.size());
Expand Down Expand Up @@ -337,7 +340,7 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
}
});
if (!failedActions.isEmpty()) {
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue());
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
}
}

Expand Down Expand Up @@ -442,24 +445,27 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
error instanceof CallQueueTooBigException);
}

private void tryResubmit(Stream<Action> actions, int tries, boolean immediately) {
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
boolean isCallQueueTooBig) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
long delayNs;
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
failAll(actions, tries);
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNs, tries - 1);
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
}
Expand Down Expand Up @@ -498,7 +504,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
sendOrDelay(actionsByServer, tries);
}
if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries, false);
tryResubmit(locateFailed.stream(), tries, false, false);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class AsyncClientScanner {

private final long pauseNs;

private final long pauseForCQTBENs;

private final int maxAttempts;

private final long scanTimeoutNs;
Expand All @@ -84,8 +86,8 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
}
Expand All @@ -98,6 +100,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand Down Expand Up @@ -160,14 +163,16 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
}

private void startScan(OpenScannerResponse resp) {
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
.location(resp.loc).remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.start(resp.controller, resp.resp), (hasMore, error) -> {
addListener(
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
Expand All @@ -185,8 +190,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
.call();
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}

private long getPrimaryTimeoutNs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
Expand All @@ -54,13 +55,17 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Timeout configs.
*/
@InterfaceAudience.Private
class AsyncConnectionConfiguration {

private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);

private final long metaOperationTimeoutNs;

// timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
Expand All @@ -79,6 +84,8 @@ class AsyncConnectionConfiguration {

private final long pauseNs;

private final long pauseForCQTBENs;

private final int maxRetries;

/** How many retries are allowed before we start to log */
Expand Down Expand Up @@ -121,8 +128,16 @@ class AsyncConnectionConfiguration {
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
this.writeRpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
this.pauseNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
if (pauseForCQTBEMs < pauseMs) {
LOG.warn(
"The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
pauseForCQTBEMs = pauseMs;
}
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
Expand Down Expand Up @@ -173,6 +188,10 @@ long getPauseNs() {
return pauseNs;
}

long getPauseForCQTBENs() {
return pauseForCQTBENs;
}

int getMaxRetries() {
return maxRetries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public interface Callable<T> {
private final Callable<T> callable;

public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
this.callable = callable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -59,6 +60,8 @@ public abstract class AsyncRpcRetryingCaller<T> {

private final long pauseNs;

private final long pauseForCQTBENs;

private int tries = 1;

private final int maxAttempts;
Expand All @@ -78,12 +81,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;

public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.priority = priority;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand Down Expand Up @@ -123,16 +127,17 @@ protected final void resetCallTimeout() {
}

private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) {
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally();
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNs, tries - 1);
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
tries++;
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
Expand Down
Loading