Skip to content

Commit 6c63929

Browse files
author
Ray Mattingly
committed
ignore retry multiplier when calculating throttled pause
1 parent aa0c3f1 commit 6c63929

File tree

5 files changed

+133
-11
lines changed

5 files changed

+133
-11
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
6161
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
6262
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
63+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
6364
import org.apache.hadoop.hbase.util.Bytes;
6465
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
6566
import org.apache.yetus.audience.InterfaceAudience;
@@ -473,7 +474,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
473474
groupAndSend(actions, tries);
474475
return;
475476
}
476-
long delayNs;
477+
477478
boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error);
478479
OptionalLong maybePauseNsToUse =
479480
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
@@ -482,16 +483,22 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
482483
return;
483484
}
484485
long pauseNsToUse = maybePauseNsToUse.getAsLong();
486+
if (!(error instanceof RpcThrottlingException)) {
487+
// RpcThrottlingException tells us exactly how long the client should wait for,
488+
// so we should not factor in the retry count for said exception
489+
pauseNsToUse = getPauseTime(pauseNsToUse, tries - 1);
490+
}
485491

492+
long delayNs;
486493
if (operationTimeoutNs > 0) {
487494
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
488495
if (maxDelayNs <= 0) {
489496
failAll(actions, tries);
490497
return;
491498
}
492-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
499+
delayNs = Math.min(maxDelayNs, pauseNsToUse);
493500
} else {
494-
delayNs = getPauseTime(pauseNsToUse, tries - 1);
501+
delayNs = pauseNsToUse;
495502
}
496503

497504
if (isServerOverloaded) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
4040
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
4141
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
42+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
4243
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4344
import org.apache.hadoop.hbase.util.FutureUtils;
4445
import org.apache.yetus.audience.InterfaceAudience;
@@ -132,17 +133,24 @@ private void tryScheduleRetry(Throwable error) {
132133
}
133134
long pauseNsToUse = maybePauseNsToUse.getAsLong();
134135

136+
if (!(error instanceof RpcThrottlingException)) {
137+
// RpcThrottlingException tells us exactly how long the client should wait for,
138+
// so we should not factor in the retry count for said exception
139+
pauseNsToUse = getPauseTime(pauseNsToUse, tries - 1);
140+
}
141+
135142
long delayNs;
136143
if (operationTimeoutNs > 0) {
137144
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
138145
if (maxDelayNs <= 0) {
139146
completeExceptionally();
140147
return;
141148
}
142-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
149+
delayNs = Math.min(maxDelayNs, pauseNsToUse);
143150
} else {
144-
delayNs = getPauseTime(pauseNsToUse, tries - 1);
151+
delayNs = pauseNsToUse;
145152
}
153+
146154
tries++;
147155
if (HBaseServerException.isServerOverloaded(error)) {
148156
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
5050
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
5151
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
52+
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
5253
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
5354
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
5455
import org.apache.yetus.audience.InterfaceAudience;
@@ -417,7 +418,7 @@ private void onError(Throwable error) {
417418
completeExceptionally(!scannerClosed);
418419
return;
419420
}
420-
long delayNs;
421+
421422
OptionalLong maybePauseNsToUse =
422423
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
423424
if (!maybePauseNsToUse.isPresent()) {
@@ -426,16 +427,24 @@ private void onError(Throwable error) {
426427
}
427428
long pauseNsToUse = maybePauseNsToUse.getAsLong();
428429

430+
if (!(error instanceof RpcThrottlingException)) {
431+
// RpcThrottlingException tells us exactly how long the client should wait for,
432+
// so we should not factor in the retry count for said exception
433+
pauseNsToUse = getPauseTime(pauseNsToUse, tries - 1);
434+
}
435+
436+
long delayNs;
429437
if (scanTimeoutNs > 0) {
430438
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
431439
if (maxDelayNs <= 0) {
432440
completeExceptionally(!scannerClosed);
433441
return;
434442
}
435-
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
443+
delayNs = Math.min(maxDelayNs, pauseNsToUse);
436444
} else {
437-
delayNs = getPauseTime(pauseNsToUse, tries - 1);
445+
delayNs = pauseNsToUse;
438446
}
447+
439448
if (scannerClosed) {
440449
completeWhenError(false);
441450
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,24 @@ public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverl
3737
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
3838
}
3939

40+
/**
41+
* Returns the nanos, if any, for which the client should wait
42+
* @param error The exception from the server
43+
* @param remainingTimeNs The remaining nanos before timeout
44+
* @return The time, in nanos, to pause. If empty then pausing would exceed our timeout, so we
45+
* should throw now
46+
*/
4047
public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) {
4148
long expectedSleepNs;
4249
if (error instanceof RpcThrottlingException) {
4350
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
4451
expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
4552
if (expectedSleepNs > remainingTimeNs) {
53+
if (LOG.isDebugEnabled()) {
54+
LOG.debug(
55+
"RpcThrottlingException suggested pause of {}ms which would exceed the timeout. We should throw instead.",
56+
expectedSleepNs, rpcThrottlingException);
57+
}
4658
return OptionalLong.empty();
4759
}
4860
if (LOG.isDebugEnabled()) {

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
import java.util.concurrent.ExecutionException;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicInteger;
3233
import org.apache.hadoop.conf.Configuration;
3334
import org.apache.hadoop.hbase.HBaseClassTestRule;
3435
import org.apache.hadoop.hbase.HBaseTestingUtil;
3536
import org.apache.hadoop.hbase.HConstants;
37+
import org.apache.hadoop.hbase.RegionTooBusyException;
3638
import org.apache.hadoop.hbase.TableName;
3739
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
3840
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -69,7 +71,10 @@ public class TestAsyncClientPauseForRpcThrottling {
6971

7072
private static AsyncConnection CONN;
7173
private static final AtomicBoolean THROTTLE = new AtomicBoolean(false);
74+
private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0);
7275
private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
76+
private static final int RETRY_COUNT = 3;
77+
private static final int MAX_MULTIPLIER_EXPECTATION = 2;
7378

7479
public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices {
7580

@@ -80,24 +85,34 @@ public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException {
8085
@Override
8186
public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
8287
throws ServiceException {
88+
maybeForceRetry();
8389
maybeThrottle();
8490
return super.get(controller, request);
8591
}
8692

8793
@Override
8894
public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
8995
throws ServiceException {
96+
maybeForceRetry();
9097
maybeThrottle();
9198
return super.multi(rpcc, request);
9299
}
93100

94101
@Override
95102
public ClientProtos.ScanResponse scan(RpcController controller,
96103
ClientProtos.ScanRequest request) throws ServiceException {
104+
maybeForceRetry();
97105
maybeThrottle();
98106
return super.scan(controller, request);
99107
}
100108

109+
private void maybeForceRetry() throws ServiceException {
110+
if (FORCE_RETRIES.get() > 0) {
111+
FORCE_RETRIES.addAndGet(-1);
112+
throw new ServiceException(new RegionTooBusyException("Retry"));
113+
}
114+
}
115+
101116
private void maybeThrottle() throws ServiceException {
102117
if (THROTTLE.get()) {
103118
THROTTLE.set(false);
@@ -121,6 +136,12 @@ protected RSRpcServices createRpcServices() throws IOException {
121136

122137
@BeforeClass
123138
public static void setUp() throws Exception {
139+
assertTrue(
140+
"The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] "
141+
+ "in order for our tests to adequately verify that we aren't "
142+
+ "multiplying throttled pauses based on the retry count.",
143+
MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT]);
144+
124145
UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
125146
UTIL.startMiniCluster(1);
126147
UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
@@ -149,16 +170,26 @@ public static void tearDown() throws Exception {
149170
}
150171

151172
private void assertTime(Callable<Void> callable, long time, boolean isGreater) throws Exception {
152-
long startNs = System.nanoTime();
153-
callable.call();
154-
long costNs = System.nanoTime() - startNs;
173+
long costNs = getCostNs(callable);
155174
if (isGreater) {
156175
assertTrue(costNs > time);
157176
} else {
158177
assertTrue(costNs <= time);
159178
}
160179
}
161180

181+
private void assertTimeBetween(Callable<Void> callable, long minNs, long maxNs) throws Exception {
182+
long costNs = getCostNs(callable);
183+
assertTrue(costNs > minNs);
184+
assertTrue(costNs < maxNs);
185+
}
186+
187+
private long getCostNs(Callable<Void> callable) throws Exception {
188+
long startNs = System.nanoTime();
189+
callable.call();
190+
return System.nanoTime() - startNs;
191+
}
192+
162193
@Test
163194
public void itWaitsForThrottledGet() throws Exception {
164195
boolean isThrottled = true;
@@ -193,6 +224,21 @@ public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception {
193224
}, WAIT_INTERVAL_NANOS, false);
194225
}
195226

227+
@Test
228+
public void itDoesNotMultiplyThrottledGetWait() throws Exception {
229+
THROTTLE.set(true);
230+
FORCE_RETRIES.set(RETRY_COUNT);
231+
232+
AsyncTable<AdvancedScanResultConsumer> table =
233+
CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
234+
.setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
235+
236+
assertTimeBetween(() -> {
237+
table.get(new Get(Bytes.toBytes(0))).get();
238+
return null;
239+
}, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
240+
}
241+
196242
@Test
197243
public void itWaitsForThrottledBatch() throws Exception {
198244
boolean isThrottled = true;
@@ -244,6 +290,26 @@ public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception {
244290
}, WAIT_INTERVAL_NANOS, false);
245291
}
246292

293+
@Test
294+
public void itDoesNotMultiplyThrottledBatchWait() throws Exception {
295+
THROTTLE.set(true);
296+
FORCE_RETRIES.set(RETRY_COUNT);
297+
298+
assertTimeBetween(() -> {
299+
List<CompletableFuture<?>> futures = new ArrayList<>();
300+
try (AsyncBufferedMutator mutator =
301+
CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
302+
.setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build()) {
303+
for (int i = 100; i < 110; i++) {
304+
futures.add(mutator
305+
.mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))));
306+
}
307+
}
308+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
309+
return null;
310+
}, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
311+
}
312+
247313
@Test
248314
public void itWaitsForThrottledScan() throws Exception {
249315
boolean isThrottled = true;
@@ -291,4 +357,24 @@ public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception {
291357
return null;
292358
}, WAIT_INTERVAL_NANOS, false);
293359
}
360+
361+
@Test
362+
public void itDoesNotMultiplyThrottledScanWait() throws Exception {
363+
THROTTLE.set(true);
364+
FORCE_RETRIES.set(RETRY_COUNT);
365+
366+
AsyncTable<AdvancedScanResultConsumer> table =
367+
CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
368+
.setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build();
369+
370+
assertTimeBetween(() -> {
371+
try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) {
372+
for (int i = 0; i < 100; i++) {
373+
Result result = scanner.next();
374+
assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER));
375+
}
376+
}
377+
return null;
378+
}, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
379+
}
294380
}

0 commit comments

Comments
 (0)