Skip to content

Commit 902f8db

Browse files
author
Ray Mattingly
committed
reuse PauseManager#remainingTimeNs
1 parent fc8eb20 commit 902f8db

File tree

5 files changed

+45
-48
lines changed

5 files changed

+45
-48
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,8 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
178178
}
179179
this.action2Errors = new IdentityHashMap<>();
180180
this.startNs = System.nanoTime();
181-
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded,
182-
operationTimeoutNs, startNs);
181+
this.pauseManager =
182+
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
183183
}
184184

185185
private static boolean hasIncrementOrAppend(Row action) {
@@ -202,10 +202,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) {
202202
return false;
203203
}
204204

205-
private long remainingTimeNs() {
206-
return operationTimeoutNs - (System.nanoTime() - startNs);
207-
}
208-
209205
private List<ThrowableWithExtraContext> removeErrors(Action action) {
210206
synchronized (action2Errors) {
211207
return action2Errors.remove(action);
@@ -365,7 +361,7 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
365361
private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
366362
long remainingNs;
367363
if (operationTimeoutNs > 0) {
368-
remainingNs = remainingTimeNs();
364+
remainingNs = pauseManager.remainingTimeNs(startNs);
369365
if (remainingNs <= 0) {
370366
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
371367
tries);
@@ -473,7 +469,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
473469
return;
474470
}
475471

476-
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries);
472+
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
477473
if (!maybePauseNsToUse.isPresent()) {
478474
failAll(actions, tries);
479475
return;
@@ -489,7 +485,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
489485
private void groupAndSend(Stream<Action> actions, int tries) {
490486
long locateTimeoutNs;
491487
if (operationTimeoutNs > 0) {
492-
locateTimeoutNs = remainingTimeNs();
488+
locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
493489
if (locateTimeoutNs <= 0) {
494490
failAll(actions, tries);
495491
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,16 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
9191
this.controller.setPriority(priority);
9292
this.exceptions = new ArrayList<>();
9393
this.startNs = System.nanoTime();
94-
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded,
95-
operationTimeoutNs, startNs);
94+
this.pauseManager =
95+
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
9696
}
9797

9898
private long elapsedMs() {
9999
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
100100
}
101101

102102
protected final long remainingTimeNs() {
103-
return operationTimeoutNs - (System.nanoTime() - startNs);
103+
return pauseManager.remainingTimeNs(startNs);
104104
}
105105

106106
protected final void completeExceptionally() {
@@ -123,7 +123,7 @@ protected final void resetCallTimeout() {
123123
}
124124

125125
private void tryScheduleRetry(Throwable error) {
126-
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries);
126+
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
127127
if (!maybePauseNsToUse.isPresent()) {
128128
completeExceptionally();
129129
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -342,18 +342,14 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
342342
this.controller = conn.rpcControllerFactory.newController();
343343
this.controller.setPriority(priority);
344344
this.exceptions = new ArrayList<>();
345-
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded,
346-
scanTimeoutNs, nextCallStartNs);
345+
this.pauseManager =
346+
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
347347
}
348348

349349
private long elapsedMs() {
350350
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
351351
}
352352

353-
private long remainingTimeNs() {
354-
return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
355-
}
356-
357353
private void closeScanner() {
358354
incRPCCallsMetrics(scanMetrics, regionServerRemote);
359355
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
@@ -417,7 +413,8 @@ private void onError(Throwable error) {
417413
return;
418414
}
419415

420-
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries);
416+
OptionalLong maybePauseNsToUse =
417+
pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
421418
if (!maybePauseNsToUse.isPresent()) {
422419
completeExceptionally(!scannerClosed);
423420
return;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,12 @@ public class HBaseServerExceptionPauseManager {
3535
private final long pauseNs;
3636
private final long pauseNsForServerOverloaded;
3737
private final long timeoutNs;
38-
private final long startNs;
3938

4039
public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded,
41-
long timeoutNs, long startNs) {
40+
long timeoutNs) {
4241
this.pauseNs = pauseNs;
4342
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
4443
this.timeoutNs = timeoutNs;
45-
this.startNs = startNs;
4644
}
4745

4846
/**
@@ -52,12 +50,12 @@ public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverl
5250
* @return The time, in nanos, to pause. If empty then pausing would exceed our timeout, so we
5351
* should throw now
5452
*/
55-
public OptionalLong getPauseNsFromException(Throwable error, int tries) {
53+
public OptionalLong getPauseNsFromException(Throwable error, int tries, long startNs) {
5654
long expectedSleepNs;
55+
long remainingTimeNs = remainingTimeNs(startNs) - SLEEP_DELTA_NS;
5756
if (error instanceof RpcThrottlingException) {
5857
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
5958
expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
60-
long remainingTimeNs = remainingTimeNs();
6159
if (expectedSleepNs > remainingTimeNs && remainingTimeNs > 0) {
6260
if (LOG.isDebugEnabled()) {
6361
LOG.debug("RpcThrottlingException suggested pause of {}ns which would exceed "
@@ -78,7 +76,6 @@ public OptionalLong getPauseNsFromException(Throwable error, int tries) {
7876
}
7977

8078
if (timeoutNs > 0) {
81-
long remainingTimeNs = remainingTimeNs();
8279
if (remainingTimeNs <= 0) {
8380
return OptionalLong.empty();
8481
}
@@ -88,8 +85,8 @@ public OptionalLong getPauseNsFromException(Throwable error, int tries) {
8885
return OptionalLong.of(expectedSleepNs);
8986
}
9087

91-
private long remainingTimeNs() {
92-
return timeoutNs - (System.nanoTime() - startNs) - SLEEP_DELTA_NS;
88+
public long remainingTimeNs(long startNs) {
89+
return timeoutNs - (System.nanoTime() - startNs);
9390
}
9491

9592
}

hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ public class TestHBaseServerExceptionPauseManager {
5353

5454
@Test
5555
public void itSupportsRpcThrottlingNanosNoTimeout() {
56-
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
57-
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0, System.nanoTime());
56+
HBaseServerExceptionPauseManager pauseManager =
57+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
5858

59-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1);
59+
OptionalLong pauseNanos =
60+
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
6061

6162
assertTrue(pauseNanos.isPresent());
6263
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
@@ -65,20 +66,22 @@ public void itSupportsRpcThrottlingNanosNoTimeout() {
6566
@Test
6667
public void itSupportsRpcThrottlingNanosLenientTimeout() {
6768
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
68-
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2, System.nanoTime());
69+
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2);
6970

70-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1);
71+
OptionalLong pauseNanos =
72+
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
7173

7274
assertTrue(pauseNanos.isPresent());
7375
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
7476
}
7577

7678
@Test
7779
public void itSupportsServerOverloadedExceptionNanos() {
78-
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
79-
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0, System.nanoTime());
80+
HBaseServerExceptionPauseManager pauseManager =
81+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
8082

81-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1);
83+
OptionalLong pauseNanos =
84+
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1, System.nanoTime());
8285

8386
assertTrue(pauseNanos.isPresent());
8487
// account for 1% jitter in pause time
@@ -88,10 +91,11 @@ public void itSupportsServerOverloadedExceptionNanos() {
8891

8992
@Test
9093
public void itSupportsOtherExceptionNanos() {
91-
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
92-
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0, System.nanoTime());
94+
HBaseServerExceptionPauseManager pauseManager =
95+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
9396

94-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1);
97+
OptionalLong pauseNanos =
98+
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
9599

96100
assertTrue(pauseNanos.isPresent());
97101
// account for 1% jitter in pause time
@@ -101,30 +105,33 @@ public void itSupportsOtherExceptionNanos() {
101105

102106
@Test
103107
public void itTimesOutRpcThrottlingException() {
104-
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
105-
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1, System.nanoTime());
108+
HBaseServerExceptionPauseManager pauseManager =
109+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
106110

107-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1);
111+
OptionalLong pauseNanos =
112+
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());
108113

109114
assertFalse(pauseNanos.isPresent());
110115
}
111116

112117
@Test
113118
public void itTimesOutRpcOtherException() {
114-
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
115-
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1, System.nanoTime());
119+
HBaseServerExceptionPauseManager pauseManager =
120+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
116121

117-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1);
122+
OptionalLong pauseNanos =
123+
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
118124

119125
assertFalse(pauseNanos.isPresent());
120126
}
121127

122128
@Test
123129
public void itDoesNotTimeOutIfDisabled() {
124-
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
125-
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0, System.nanoTime());
130+
HBaseServerExceptionPauseManager pauseManager =
131+
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
126132

127-
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1);
133+
OptionalLong pauseNanos =
134+
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());
128135

129136
assertTrue(pauseNanos.isPresent());
130137
}

0 commit comments

Comments
 (0)