Skip to content

Commit 8c33f5b

Browse files
committed
HBASE-26807 Unify CallQueueTooBigException special pause with CallDroppedException
1 parent bcd9a9a commit 8c33f5b

24 files changed

+212
-157
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.hadoop.hbase;
2020

21-
import java.io.IOException;
22-
2321
import org.apache.yetus.audience.InterfaceAudience;
2422

2523
/**
@@ -28,7 +26,7 @@
2826
*/
2927
@SuppressWarnings("serial")
3028
@InterfaceAudience.Public
31-
public class CallDroppedException extends IOException {
29+
public class CallDroppedException extends ServerOverloadedException {
3230
public CallDroppedException() {
3331
super();
3432
}

hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
package org.apache.hadoop.hbase;
2020

21-
import java.io.IOException;
22-
2321
import org.apache.yetus.audience.InterfaceAudience;
2422

23+
/**
24+
* Returned to clients when their request could not be enqueued due to the server being
25+
* overloaded. Clients should retry upon receiving it.
26+
*/
2527
@SuppressWarnings("serial")
2628
@InterfaceAudience.Public
27-
public class CallQueueTooBigException extends IOException {
29+
public class CallQueueTooBigException extends ServerOverloadedException {
2830
public CallQueueTooBigException() {
2931
super();
3032
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.apache.hadoop.hbase;
2+
3+
import org.apache.yetus.audience.InterfaceAudience;
4+
5+
/**
6+
* Base class for exceptions thrown when the hbase server is overloaded.
7+
*/
8+
@InterfaceAudience.Public
9+
public class ServerOverloadedException extends HBaseIOException {
10+
public ServerOverloadedException() {
11+
}
12+
13+
public ServerOverloadedException(String message) {
14+
super(message);
15+
}
16+
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public interface AsyncAdminBuilder {
5050
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
5151
* retrying.
5252
* @return this for invocation chaining
53-
* @see #setRetryPauseForCQTBE(long, TimeUnit)
53+
* @see #setRetryPauseForServerOverloaded(long, TimeUnit)
5454
*/
5555
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);
5656

@@ -64,7 +64,7 @@ public interface AsyncAdminBuilder {
6464
* {@code CallQueueTooBigException} if here you specify a smaller value.
6565
* @see #setRetryPause(long, TimeUnit)
6666
*/
67-
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
67+
AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit);
6868

6969
/**
7070
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
3333

3434
protected long pauseNs;
3535

36-
protected long pauseForCQTBENs;
36+
protected long pauseForServerOverloaded;
3737

3838
protected int maxAttempts;
3939

@@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
4343
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
4444
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
4545
this.pauseNs = connConf.getPauseNs();
46-
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
46+
this.pauseForServerOverloaded = connConf.getPauseForServerOverloaded();
4747
this.maxAttempts = connConf.getMaxRetries();
4848
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
4949
}
@@ -67,8 +67,8 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) {
6767
}
6868

6969
@Override
70-
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
71-
this.pauseForCQTBENs = unit.toNanos(timeout);
70+
public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) {
71+
this.pauseForServerOverloaded = unit.toNanos(timeout);
7272
return this;
7373
}
7474

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ public interface Callable<T> {
4444
private ServerName serverName;
4545

4646
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
47-
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
47+
long pauseNs, long pauseForServerOverloaded, int maxAttempts, long operationTimeoutNs,
4848
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
49-
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
49+
super(retryTimer, conn, priority, pauseNs, pauseForServerOverloaded, maxAttempts, operationTimeoutNs,
5050
rpcTimeoutNs, startLogErrorsCnt);
5151
this.serverName = serverName;
5252
this.callable = callable;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@
4545
import java.util.stream.Collectors;
4646
import java.util.stream.Stream;
4747
import org.apache.commons.lang3.mutable.MutableBoolean;
48-
import org.apache.hadoop.hbase.CallQueueTooBigException;
4948
import org.apache.hadoop.hbase.CellScannable;
5049
import org.apache.hadoop.hbase.DoNotRetryIOException;
5150
import org.apache.hadoop.hbase.HConstants;
5251
import org.apache.hadoop.hbase.HRegionLocation;
5352
import org.apache.hadoop.hbase.RetryImmediatelyException;
5453
import org.apache.hadoop.hbase.ServerName;
54+
import org.apache.hadoop.hbase.ServerOverloadedException;
5555
import org.apache.hadoop.hbase.TableName;
5656
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
5757
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
@@ -104,7 +104,7 @@ class AsyncBatchRpcRetryingCaller<T> {
104104

105105
private final long pauseNs;
106106

107-
private final long pauseForCQTBENs;
107+
private final long pauseNsForServerOverloaded;
108108

109109
private final int maxAttempts;
110110

@@ -150,13 +150,13 @@ public int getPriority() {
150150
}
151151

152152
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
153-
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
153+
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded,
154154
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
155155
this.retryTimer = retryTimer;
156156
this.conn = conn;
157157
this.tableName = tableName;
158158
this.pauseNs = pauseNs;
159-
this.pauseForCQTBENs = pauseForCQTBENs;
159+
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
160160
this.maxAttempts = maxAttempts;
161161
this.operationTimeoutNs = operationTimeoutNs;
162162
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -466,17 +466,17 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
466466
.collect(Collectors.toList());
467467
addError(copiedActions, error, serverName);
468468
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
469-
error instanceof CallQueueTooBigException);
469+
error instanceof ServerOverloadedException);
470470
}
471471

472472
private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
473-
boolean isCallQueueTooBig) {
473+
boolean isServerOverloadedException) {
474474
if (immediately) {
475475
groupAndSend(actions, tries);
476476
return;
477477
}
478478
long delayNs;
479-
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
479+
long pauseNsToUse = isServerOverloadedException ? pauseNsForServerOverloaded : pauseNs;
480480
if (operationTimeoutNs > 0) {
481481
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
482482
if (maxDelayNs <= 0) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class AsyncClientScanner {
7373

7474
private final long pauseNs;
7575

76-
private final long pauseForCQTBENs;
76+
private final long pauseNsForServerOverloaded;
7777

7878
private final int maxAttempts;
7979

@@ -86,7 +86,7 @@ class AsyncClientScanner {
8686
private final ScanResultCache resultCache;
8787

8888
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
89-
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
89+
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
9090
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
9191
if (scan.getStartRow() == null) {
9292
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
@@ -100,7 +100,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
100100
this.conn = conn;
101101
this.retryTimer = retryTimer;
102102
this.pauseNs = pauseNs;
103-
this.pauseForCQTBENs = pauseForCQTBENs;
103+
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
104104
this.maxAttempts = maxAttempts;
105105
this.scanTimeoutNs = scanTimeoutNs;
106106
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -170,7 +170,8 @@ private void startScan(OpenScannerResponse resp) {
170170
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
171171
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
172172
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
173-
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
173+
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
174+
.maxAttempts(maxAttempts)
174175
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
175176
(hasMore, error) -> {
176177
if (error != null) {
@@ -190,7 +191,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
190191
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
191192
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
192193
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
193-
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
194+
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
195+
.maxAttempts(maxAttempts)
194196
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
195197
}
196198

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
3232
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
3333
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
34+
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED;
3435
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
3536
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
3637
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
@@ -92,7 +93,7 @@ class AsyncConnectionConfiguration {
9293

9394
private final long pauseNs;
9495

95-
private final long pauseForCQTBENs;
96+
private final long pauseForServerOverloaded;
9697

9798
private final int maxRetries;
9899

@@ -137,15 +138,17 @@ class AsyncConnectionConfiguration {
137138
this.writeRpcTimeoutNs =
138139
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
139140
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
140-
long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
141-
if (pauseForCQTBEMs < pauseMs) {
141+
long pauseForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
142+
conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
143+
if (pauseForServerOverloaded < pauseMs) {
142144
LOG.warn(
143145
"The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
144-
HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
145-
pauseForCQTBEMs = pauseMs;
146+
HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseForServerOverloaded,
147+
HBASE_CLIENT_PAUSE, pauseMs);
148+
pauseForServerOverloaded = pauseMs;
146149
}
147150
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
148-
this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
151+
this.pauseForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseForServerOverloaded);
149152
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
150153
this.startLogErrorsCnt =
151154
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
@@ -196,8 +199,8 @@ long getPauseNs() {
196199
return pauseNs;
197200
}
198201

199-
long getPauseForCQTBENs() {
200-
return pauseForCQTBENs;
202+
long getPauseForServerOverloaded() {
203+
return pauseForServerOverloaded;
201204
}
202205

203206
int getMaxRetries() {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ public interface Callable<T> {
4444
private final Callable<T> callable;
4545

4646
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
47-
Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries,
48-
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
49-
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs,
50-
rpcTimeoutNs, startLogErrorsCnt);
47+
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
48+
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
49+
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries,
50+
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
5151
this.callable = callable;
5252
}
5353

0 commit comments

Comments
 (0)