Skip to content

Commit 0456e2b

Browse files
committed
HBASE-22322 Use special pause for CallQueueTooBigException
1 parent 962585d commit 0456e2b

17 files changed

+383
-80
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,34 +36,40 @@ public interface AsyncAdminBuilder {
3636
* Set timeout for a whole admin operation. Operation timeout and max attempt times(or max retry
3737
* times) are both limitations for retrying, we will stop retrying when we reach any of the
3838
* limitations.
39-
* @param timeout
40-
* @param unit
4139
* @return this for invocation chaining
4240
*/
4341
AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit);
4442

4543
/**
4644
* Set timeout for each rpc request.
47-
* @param timeout
48-
* @param unit
4945
* @return this for invocation chaining
5046
*/
5147
AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit);
5248

5349
/**
5450
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
5551
* retrying.
56-
* @param timeout
57-
* @param unit
5852
* @return this for invocation chaining
53+
* @see #setRetryPauseForCQTBE(long, TimeUnit)
5954
*/
6055
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);
6156

57+
/**
58+
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
59+
* exponential policy to generate sleep time when retrying.
60+
* <p/>
61+
* This value should be greater than the normal pause value which could be set with the above
62+
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
63+
* means the server is overloaded. We just use the normal pause value for
64+
* {@code CallQueueTooBigException} if here you specify a smaller value.
65+
* @see #setRetryPause(long, TimeUnit)
66+
*/
67+
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
68+
6269
/**
6370
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
6471
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
6572
* we will stop retrying when we reach any of the limitations.
66-
* @param maxRetries
6773
* @return this for invocation chaining
6874
*/
6975
default AsyncAdminBuilder setMaxRetries(int maxRetries) {
@@ -74,14 +80,12 @@ default AsyncAdminBuilder setMaxRetries(int maxRetries) {
7480
* Set the max attempt times for an admin operation. Usually it is the max retry times plus 1.
7581
* Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
7682
* we will stop retrying when we reach any of the limitations.
77-
* @param maxAttempts
7883
* @return this for invocation chaining
7984
*/
8085
AsyncAdminBuilder setMaxAttempts(int maxAttempts);
8186

8287
/**
8388
* Set the number of retries that are allowed before we start to log.
84-
* @param startLogErrorsCnt
8589
* @return this for invocation chaining
8690
*/
8791
AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt);

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
3333

3434
protected long pauseNs;
3535

36+
protected long pauseForCQTBENs;
37+
3638
protected int maxAttempts;
3739

3840
protected int startLogErrorsCnt;
@@ -41,6 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
4143
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
4244
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
4345
this.pauseNs = connConf.getPauseNs();
46+
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
4447
this.maxAttempts = connConf.getMaxRetries();
4548
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
4649
}
@@ -63,6 +66,12 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) {
6366
return this;
6467
}
6568

69+
@Override
70+
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
71+
this.pauseForCQTBENs = unit.toNanos(timeout);
72+
return this;
73+
}
74+
6675
@Override
6776
public AsyncAdminBuilder setMaxAttempts(int maxAttempts) {
6877
this.maxAttempts = maxAttempts;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ public interface Callable<T> {
4444
private ServerName serverName;
4545

4646
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
47-
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
48-
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
49-
super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
50-
startLogErrorsCnt);
47+
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
48+
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
49+
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
50+
rpcTimeoutNs, startLogErrorsCnt);
5151
this.serverName = serverName;
5252
this.callable = callable;
5353
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.stream.Collectors;
4646
import java.util.stream.Stream;
4747
import org.apache.commons.lang3.mutable.MutableBoolean;
48+
import org.apache.hadoop.hbase.CallQueueTooBigException;
4849
import org.apache.hadoop.hbase.CellScannable;
4950
import org.apache.hadoop.hbase.DoNotRetryIOException;
5051
import org.apache.hadoop.hbase.HConstants;
@@ -103,6 +104,8 @@ class AsyncBatchRpcRetryingCaller<T> {
103104

104105
private final long pauseNs;
105106

107+
private final long pauseForCQTBENs;
108+
106109
private final int maxAttempts;
107110

108111
private final long operationTimeoutNs;
@@ -147,17 +150,17 @@ public int getPriority() {
147150
}
148151

149152
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
150-
TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
151-
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
153+
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
154+
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
152155
this.retryTimer = retryTimer;
153156
this.conn = conn;
154157
this.tableName = tableName;
155158
this.pauseNs = pauseNs;
159+
this.pauseForCQTBENs = pauseForCQTBENs;
156160
this.maxAttempts = maxAttempts;
157161
this.operationTimeoutNs = operationTimeoutNs;
158162
this.rpcTimeoutNs = rpcTimeoutNs;
159163
this.startLogErrorsCnt = startLogErrorsCnt;
160-
161164
this.actions = new ArrayList<>(actions.size());
162165
this.futures = new ArrayList<>(actions.size());
163166
this.action2Future = new IdentityHashMap<>(actions.size());
@@ -337,7 +340,7 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
337340
}
338341
});
339342
if (!failedActions.isEmpty()) {
340-
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue());
343+
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
341344
}
342345
}
343346

@@ -442,24 +445,27 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
442445
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
443446
.collect(Collectors.toList());
444447
addError(copiedActions, error, serverName);
445-
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException);
448+
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
449+
error instanceof CallQueueTooBigException);
446450
}
447451

448-
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately) {
452+
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
453+
boolean isCallQueueTooBig) {
449454
if (immediately) {
450455
groupAndSend(actions, tries);
451456
return;
452457
}
453458
long delayNs;
459+
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
454460
if (operationTimeoutNs > 0) {
455461
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
456462
if (maxDelayNs <= 0) {
457463
failAll(actions, tries);
458464
return;
459465
}
460-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
466+
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
461467
} else {
462-
delayNs = getPauseTime(pauseNs, tries - 1);
468+
delayNs = getPauseTime(pauseNsToUse, tries - 1);
463469
}
464470
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
465471
}
@@ -498,7 +504,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
498504
sendOrDelay(actionsByServer, tries);
499505
}
500506
if (!locateFailed.isEmpty()) {
501-
tryResubmit(locateFailed.stream(), tries, false);
507+
tryResubmit(locateFailed.stream(), tries, false, false);
502508
}
503509
});
504510
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class AsyncClientScanner {
7373

7474
private final long pauseNs;
7575

76+
private final long pauseForCQTBENs;
77+
7678
private final int maxAttempts;
7779

7880
private final long scanTimeoutNs;
@@ -84,8 +86,8 @@ class AsyncClientScanner {
8486
private final ScanResultCache resultCache;
8587

8688
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
87-
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs,
88-
long rpcTimeoutNs, int startLogErrorsCnt) {
89+
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
90+
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
8991
if (scan.getStartRow() == null) {
9092
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
9193
}
@@ -98,6 +100,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
98100
this.conn = conn;
99101
this.retryTimer = retryTimer;
100102
this.pauseNs = pauseNs;
103+
this.pauseForCQTBENs = pauseForCQTBENs;
101104
this.maxAttempts = maxAttempts;
102105
this.scanTimeoutNs = scanTimeoutNs;
103106
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -160,14 +163,16 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
160163
}
161164

162165
private void startScan(OpenScannerResponse resp) {
163-
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
164-
.location(resp.loc).remote(resp.isRegionServerRemote)
165-
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
166-
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
167-
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
168-
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
169-
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
170-
.start(resp.controller, resp.resp), (hasMore, error) -> {
166+
addListener(
167+
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
168+
.remote(resp.isRegionServerRemote)
169+
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
170+
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
171+
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
172+
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
173+
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
174+
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
175+
(hasMore, error) -> {
171176
if (error != null) {
172177
consumer.onError(error);
173178
return;
@@ -185,8 +190,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
185190
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
186191
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
187192
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
188-
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
189-
.call();
193+
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
194+
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
190195
}
191196

192197
private long getPrimaryTimeoutNs() {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
3131
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
3232
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
33+
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
3334
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
3435
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
3536
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
@@ -54,13 +55,17 @@
5455
import java.util.concurrent.TimeUnit;
5556
import org.apache.hadoop.conf.Configuration;
5657
import org.apache.yetus.audience.InterfaceAudience;
58+
import org.slf4j.Logger;
59+
import org.slf4j.LoggerFactory;
5760

5861
/**
5962
* Timeout configs.
6063
*/
6164
@InterfaceAudience.Private
6265
class AsyncConnectionConfiguration {
6366

67+
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
68+
6469
private final long metaOperationTimeoutNs;
6570

6671
// timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
@@ -79,6 +84,8 @@ class AsyncConnectionConfiguration {
7984

8085
private final long pauseNs;
8186

87+
private final long pauseForCQTBENs;
88+
8289
private final int maxRetries;
8390

8491
/** How many retries are allowed before we start to log */
@@ -121,8 +128,16 @@ class AsyncConnectionConfiguration {
121128
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
122129
this.writeRpcTimeoutNs =
123130
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
124-
this.pauseNs =
125-
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
131+
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
132+
long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
133+
if (pauseForCQTBEMs < pauseMs) {
134+
LOG.warn(
135+
"The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
136+
HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
137+
pauseForCQTBEMs = pauseMs;
138+
}
139+
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
140+
this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
126141
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
127142
this.startLogErrorsCnt =
128143
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
@@ -173,6 +188,10 @@ long getPauseNs() {
173188
return pauseNs;
174189
}
175190

191+
long getPauseForCQTBENs() {
192+
return pauseForCQTBENs;
193+
}
194+
176195
int getMaxRetries() {
177196
return maxRetries;
178197
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ public interface Callable<T> {
4444
private final Callable<T> callable;
4545

4646
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
47-
Callable<T> callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs,
48-
long rpcTimeoutNs, int startLogErrorsCnt) {
49-
super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
50-
startLogErrorsCnt);
47+
Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries,
48+
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
49+
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs,
50+
rpcTimeoutNs, startLogErrorsCnt);
5151
this.callable = callable;
5252
}
5353

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.function.Consumer;
3333
import java.util.function.Supplier;
34+
import org.apache.hadoop.hbase.CallQueueTooBigException;
3435
import org.apache.hadoop.hbase.DoNotRetryIOException;
3536
import org.apache.hadoop.hbase.NotServingRegionException;
3637
import org.apache.hadoop.hbase.TableName;
@@ -59,6 +60,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
5960

6061
private final long pauseNs;
6162

63+
private final long pauseForCQTBENs;
64+
6265
private int tries = 1;
6366

6467
private final int maxAttempts;
@@ -78,12 +81,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
7881
protected final HBaseRpcController controller;
7982

8083
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
81-
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
82-
int startLogErrorsCnt) {
84+
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
85+
long rpcTimeoutNs, int startLogErrorsCnt) {
8386
this.retryTimer = retryTimer;
8487
this.conn = conn;
8588
this.priority = priority;
8689
this.pauseNs = pauseNs;
90+
this.pauseForCQTBENs = pauseForCQTBENs;
8791
this.maxAttempts = maxAttempts;
8892
this.operationTimeoutNs = operationTimeoutNs;
8993
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -123,16 +127,17 @@ protected final void resetCallTimeout() {
123127
}
124128

125129
private void tryScheduleRetry(Throwable error, Consumer<Throwable> updateCachedLocation) {
130+
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
126131
long delayNs;
127132
if (operationTimeoutNs > 0) {
128133
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
129134
if (maxDelayNs <= 0) {
130135
completeExceptionally();
131136
return;
132137
}
133-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
138+
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
134139
} else {
135-
delayNs = getPauseTime(pauseNs, tries - 1);
140+
delayNs = getPauseTime(pauseNsToUse, tries - 1);
136141
}
137142
tries++;
138143
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);

0 commit comments

Comments
 (0)