Skip to content

Commit fb715ce

Browse files
rmdmattinglyRay Mattingly
authored andcommitted
HBASE-27798 Client side should back off based on wait interval in RpcThrottlingException (apache#5275)
Signed-off-by: Bryan Beaudreault <[email protected]> Signed-off-by: Duo Zhang <[email protected]>
1 parent c82cf47 commit fb715ce

File tree

7 files changed

+207
-84
lines changed

7 files changed

+207
-84
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
21-
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
2221
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
23-
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
2422
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
2523
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
2624
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
@@ -181,6 +179,8 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
181179
}
182180
this.action2Errors = new IdentityHashMap<>();
183181
this.startNs = System.nanoTime();
182+
this.pauseManager =
183+
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
184184
}
185185

186186
private static boolean hasIncrementOrAppend(Row action) {
@@ -203,10 +203,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) {
203203
return false;
204204
}
205205

206-
private long remainingTimeNs() {
207-
return operationTimeoutNs - (System.nanoTime() - startNs);
208-
}
209-
210206
private List<ThrowableWithExtraContext> removeErrors(Action action) {
211207
synchronized (action2Errors) {
212208
return action2Errors.remove(action);
@@ -366,7 +362,7 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
366362
private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
367363
long remainingNs;
368364
if (operationTimeoutNs > 0) {
369-
remainingNs = remainingTimeNs();
365+
remainingNs = pauseManager.remainingTimeNs(startNs);
370366
if (remainingNs <= 0) {
371367
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
372368
tries);
@@ -473,27 +469,13 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
473469
groupAndSend(actions, tries);
474470
return;
475471
}
476-
long delayNs;
477-
boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error);
478-
OptionalLong maybePauseNsToUse =
479-
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
472+
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
480473
if (!maybePauseNsToUse.isPresent()) {
481474
failAll(actions, tries);
482475
return;
483476
}
484-
long pauseNsToUse = maybePauseNsToUse.getAsLong();
485-
486-
if (operationTimeoutNs > 0) {
487-
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
488-
if (maxDelayNs <= 0) {
489-
failAll(actions, tries);
490-
return;
491-
}
492-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
493-
} else {
494-
delayNs = getPauseTime(pauseNsToUse, tries - 1);
495-
}
496-
if (isServerOverloaded) {
477+
long delayNs = maybePauseNsToUse.getAsLong();
478+
if (HBaseServerException.isServerOverloaded(error)) {
497479
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
498480
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
499481
}
@@ -503,7 +485,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
503485
private void groupAndSend(Stream<Action> actions, int tries) {
504486
long locateTimeoutNs;
505487
if (operationTimeoutNs > 0) {
506-
locateTimeoutNs = remainingTimeNs();
488+
locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
507489
if (locateTimeoutNs <= 0) {
508490
failAll(actions, tries);
509491
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20-
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
21-
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
2220
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
2321
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
2422

@@ -93,15 +91,16 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
9391
this.controller.setPriority(priority);
9492
this.exceptions = new ArrayList<>();
9593
this.startNs = System.nanoTime();
96-
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
94+
this.pauseManager =
95+
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
9796
}
9897

9998
private long elapsedMs() {
10099
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
101100
}
102101

103102
protected final long remainingTimeNs() {
104-
return operationTimeoutNs - (System.nanoTime() - startNs);
103+
return pauseManager.remainingTimeNs(startNs);
105104
}
106105

107106
protected final void completeExceptionally() {
@@ -124,25 +123,12 @@ protected final void resetCallTimeout() {
124123
}
125124

126125
private void tryScheduleRetry(Throwable error) {
127-
OptionalLong maybePauseNsToUse =
128-
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
126+
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
129127
if (!maybePauseNsToUse.isPresent()) {
130128
completeExceptionally();
131129
return;
132130
}
133-
long pauseNsToUse = maybePauseNsToUse.getAsLong();
134-
135-
long delayNs;
136-
if (operationTimeoutNs > 0) {
137-
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
138-
if (maxDelayNs <= 0) {
139-
completeExceptionally();
140-
return;
141-
}
142-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
143-
} else {
144-
delayNs = getPauseTime(pauseNsToUse, tries - 1);
145-
}
131+
long delayNs = maybePauseNsToUse.getAsLong();
146132
tries++;
147133

148134
if (HBaseServerException.isServerOverloaded(error)) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20-
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
21-
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
2220
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
2321
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
2422
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
@@ -344,17 +342,14 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
344342
this.controller = conn.rpcControllerFactory.newController();
345343
this.controller.setPriority(priority);
346344
this.exceptions = new ArrayList<>();
347-
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
345+
this.pauseManager =
346+
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
348347
}
349348

350349
private long elapsedMs() {
351350
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
352351
}
353352

354-
private long remainingTimeNs() {
355-
return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
356-
}
357-
358353
private void closeScanner() {
359354
incRPCCallsMetrics(scanMetrics, regionServerRemote);
360355
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
@@ -417,26 +412,14 @@ private void onError(Throwable error) {
417412
completeExceptionally(!scannerClosed);
418413
return;
419414
}
420-
long delayNs;
421415

422416
OptionalLong maybePauseNsToUse =
423-
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
417+
pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
424418
if (!maybePauseNsToUse.isPresent()) {
425419
completeExceptionally(!scannerClosed);
426420
return;
427421
}
428-
long pauseNsToUse = maybePauseNsToUse.getAsLong();
429-
430-
if (scanTimeoutNs > 0) {
431-
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
432-
if (maxDelayNs <= 0) {
433-
completeExceptionally(!scannerClosed);
434-
return;
435-
}
436-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
437-
} else {
438-
delayNs = getPauseTime(pauseNsToUse, tries - 1);
439-
}
422+
long delayNs = maybePauseNsToUse.getAsLong();
440423
if (scannerClosed) {
441424
completeWhenError(false);
442425
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ static Result filterCells(Result result, Cell keepCellsAfter) {
280280
}
281281

282282
// Add a delta to avoid timeout immediately after a retry sleeping.
283-
static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
283+
public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
284284

285285
static Get toCheckExistenceOnly(Get get) {
286286
if (get.isCheckExistenceOnly()) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
*/
1818
package org.apache.hadoop.hbase.client.backoff;
1919

20+
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
21+
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
22+
2023
import java.util.OptionalLong;
2124
import java.util.concurrent.TimeUnit;
2225
import org.apache.hadoop.hbase.HBaseServerException;
@@ -31,29 +34,59 @@ public class HBaseServerExceptionPauseManager {
3134

3235
private final long pauseNs;
3336
private final long pauseNsForServerOverloaded;
37+
private final long timeoutNs;
3438

35-
public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded) {
39+
public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded,
40+
long timeoutNs) {
3641
this.pauseNs = pauseNs;
3742
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
43+
this.timeoutNs = timeoutNs;
3844
}
3945

40-
public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) {
46+
/**
47+
* Returns the nanos, if any, for which the client should wait
48+
* @param error The exception from the server
49+
* @param tries The current retry count
50+
* @return The time, in nanos, to pause. If empty then pausing would exceed our timeout, so we
51+
* should throw now
52+
*/
53+
public OptionalLong getPauseNsFromException(Throwable error, int tries, long startNs) {
4154
long expectedSleepNs;
55+
long remainingTimeNs = remainingTimeNs(startNs) - SLEEP_DELTA_NS;
4256
if (error instanceof RpcThrottlingException) {
4357
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
4458
expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
45-
if (expectedSleepNs > remainingTimeNs) {
59+
if (expectedSleepNs > remainingTimeNs && remainingTimeNs > 0) {
60+
if (LOG.isDebugEnabled()) {
61+
LOG.debug("RpcThrottlingException suggested pause of {}ns which would exceed "
62+
+ "the timeout. We should throw instead.", expectedSleepNs, rpcThrottlingException);
63+
}
4664
return OptionalLong.empty();
4765
}
4866
if (LOG.isDebugEnabled()) {
49-
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleepNs,
67+
LOG.debug("Sleeping for {}ns after catching RpcThrottlingException", expectedSleepNs,
5068
rpcThrottlingException);
5169
}
5270
} else {
5371
expectedSleepNs =
5472
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
73+
// RpcThrottlingException tells us exactly how long the client should wait for,
74+
// so we should not factor in the retry count for said exception
75+
expectedSleepNs = getPauseTime(expectedSleepNs, tries - 1);
76+
}
77+
78+
if (timeoutNs > 0) {
79+
if (remainingTimeNs <= 0) {
80+
return OptionalLong.empty();
81+
}
82+
expectedSleepNs = Math.min(remainingTimeNs, expectedSleepNs);
5583
}
84+
5685
return OptionalLong.of(expectedSleepNs);
5786
}
5887

88+
public long remainingTimeNs(long startNs) {
89+
return timeoutNs - (System.nanoTime() - startNs);
90+
}
91+
5992
}

hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,40 +47,93 @@ public class TestHBaseServerExceptionPauseManager {
4747
private final Throwable OTHER_EXCEPTION = new RuntimeException("");
4848
private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true);
4949

50-
private final HBaseServerExceptionPauseManager pauseManager =
51-
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED);
52-
5350
@ClassRule
5451
public static final HBaseClassTestRule CLASS_RULE =
5552
HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);
5653

5754
@Test
58-
public void itSupportsRpcThrottlingNanos() {
55+
public void itSupportsRpcThrottlingNanosNoTimeout() {
56+
HBaseServerExceptionPauseManager pauseManager =
57+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
58+
5959
OptionalLong pauseNanos =
60-
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, Long.MAX_VALUE);
60+
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
61+
62+
assertTrue(pauseNanos.isPresent());
63+
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
64+
}
65+
66+
@Test
67+
public void itSupportsRpcThrottlingNanosLenientTimeout() {
68+
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
69+
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2);
70+
71+
OptionalLong pauseNanos =
72+
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
73+
6174
assertTrue(pauseNanos.isPresent());
6275
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
6376
}
6477

6578
@Test
6679
public void itSupportsServerOverloadedExceptionNanos() {
80+
HBaseServerExceptionPauseManager pauseManager =
81+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
82+
6783
OptionalLong pauseNanos =
68-
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, Long.MAX_VALUE);
84+
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1, System.nanoTime());
85+
6986
assertTrue(pauseNanos.isPresent());
70-
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS_FOR_SERVER_OVERLOADED);
87+
// account for 1% jitter in pause time
88+
assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 0.99);
89+
assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 1.01);
7190
}
7291

7392
@Test
7493
public void itSupportsOtherExceptionNanos() {
75-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, Long.MAX_VALUE);
94+
HBaseServerExceptionPauseManager pauseManager =
95+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
96+
97+
OptionalLong pauseNanos =
98+
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
99+
76100
assertTrue(pauseNanos.isPresent());
77-
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS);
101+
// account for 1% jitter in pause time
102+
assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS * 0.99);
103+
assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS * 1.01);
78104
}
79105

80106
@Test
81-
public void itThrottledTimeoutFastFail() {
82-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 0L);
107+
public void itTimesOutRpcThrottlingException() {
108+
HBaseServerExceptionPauseManager pauseManager =
109+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
110+
111+
OptionalLong pauseNanos =
112+
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
113+
114+
assertFalse(pauseNanos.isPresent());
115+
}
116+
117+
@Test
118+
public void itTimesOutRpcOtherException() {
119+
HBaseServerExceptionPauseManager pauseManager =
120+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
121+
122+
OptionalLong pauseNanos =
123+
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
124+
83125
assertFalse(pauseNanos.isPresent());
84126
}
85127

128+
@Test
129+
public void itDoesNotTimeOutIfDisabled() {
130+
HBaseServerExceptionPauseManager pauseManager =
131+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
132+
133+
OptionalLong pauseNanos =
134+
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
135+
136+
assertTrue(pauseNanos.isPresent());
137+
}
138+
86139
}

0 commit comments

Comments
 (0)