2929import java .util .concurrent .ExecutionException ;
3030import java .util .concurrent .TimeUnit ;
3131import java .util .concurrent .atomic .AtomicBoolean ;
32+ import java .util .concurrent .atomic .AtomicInteger ;
3233import org .apache .hadoop .conf .Configuration ;
3334import org .apache .hadoop .hbase .HBaseClassTestRule ;
3435import org .apache .hadoop .hbase .HBaseTestingUtil ;
3536import org .apache .hadoop .hbase .HConstants ;
37+ import org .apache .hadoop .hbase .RegionTooBusyException ;
3638import org .apache .hadoop .hbase .TableName ;
3739import org .apache .hadoop .hbase .quotas .RpcThrottlingException ;
3840import org .apache .hadoop .hbase .regionserver .HRegionServer ;
@@ -69,7 +71,10 @@ public class TestAsyncClientPauseForRpcThrottling {
6971
7072 private static AsyncConnection CONN ;
7173 private static final AtomicBoolean THROTTLE = new AtomicBoolean (false );
74+ private static final AtomicInteger FORCE_RETRIES = new AtomicInteger (0 );
7275 private static final long WAIT_INTERVAL_NANOS = TimeUnit .SECONDS .toNanos (1 );
76+ private static final int RETRY_COUNT = 3 ;
77+ private static final int MAX_MULTIPLIER_EXPECTATION = 2 ;
7378
7479 public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices {
7580
@@ -80,24 +85,34 @@ public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException {
8085 @ Override
8186 public ClientProtos .GetResponse get (RpcController controller , ClientProtos .GetRequest request )
8287 throws ServiceException {
88+ maybeForceRetry ();
8389 maybeThrottle ();
8490 return super .get (controller , request );
8591 }
8692
8793 @ Override
8894 public ClientProtos .MultiResponse multi (RpcController rpcc , ClientProtos .MultiRequest request )
8995 throws ServiceException {
96+ maybeForceRetry ();
9097 maybeThrottle ();
9198 return super .multi (rpcc , request );
9299 }
93100
94101 @ Override
95102 public ClientProtos .ScanResponse scan (RpcController controller ,
96103 ClientProtos .ScanRequest request ) throws ServiceException {
104+ maybeForceRetry ();
97105 maybeThrottle ();
98106 return super .scan (controller , request );
99107 }
100108
109+ private void maybeForceRetry () throws ServiceException {
110+ if (FORCE_RETRIES .get () > 0 ) {
111+ FORCE_RETRIES .addAndGet (-1 );
112+ throw new ServiceException (new RegionTooBusyException ("Retry" ));
113+ }
114+ }
115+
101116 private void maybeThrottle () throws ServiceException {
102117 if (THROTTLE .get ()) {
103118 THROTTLE .set (false );
@@ -121,6 +136,12 @@ protected RSRpcServices createRpcServices() throws IOException {
121136
122137 @ BeforeClass
123138 public static void setUp () throws Exception {
139+ assertTrue (
140+ "The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] "
141+ + "in order for our tests to adequately verify that we aren't "
142+ + "multiplying throttled pauses based on the retry count." ,
143+ MAX_MULTIPLIER_EXPECTATION < HConstants .RETRY_BACKOFF [RETRY_COUNT ]);
144+
124145 UTIL .getConfiguration ().setLong (HConstants .HBASE_CLIENT_RETRIES_NUMBER , 1 );
125146 UTIL .startMiniCluster (1 );
126147 UTIL .getMiniHBaseCluster ().getConfiguration ().setClass (HConstants .REGION_SERVER_IMPL ,
@@ -149,16 +170,26 @@ public static void tearDown() throws Exception {
149170 }
150171
151172 private void assertTime (Callable <Void > callable , long time , boolean isGreater ) throws Exception {
152- long startNs = System .nanoTime ();
153- callable .call ();
154- long costNs = System .nanoTime () - startNs ;
173+ long costNs = getCostNs (callable );
155174 if (isGreater ) {
156175 assertTrue (costNs > time );
157176 } else {
158177 assertTrue (costNs <= time );
159178 }
160179 }
161180
181+ private void assertTimeBetween (Callable <Void > callable , long minNs , long maxNs ) throws Exception {
182+ long costNs = getCostNs (callable );
183+ assertTrue (costNs > minNs );
184+ assertTrue (costNs < maxNs );
185+ }
186+
187+ private long getCostNs (Callable <Void > callable ) throws Exception {
188+ long startNs = System .nanoTime ();
189+ callable .call ();
190+ return System .nanoTime () - startNs ;
191+ }
192+
162193 @ Test
163194 public void itWaitsForThrottledGet () throws Exception {
164195 boolean isThrottled = true ;
@@ -193,6 +224,21 @@ public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception {
193224 }, WAIT_INTERVAL_NANOS , false );
194225 }
195226
227+ @ Test
228+ public void itDoesNotMultiplyThrottledGetWait () throws Exception {
229+ THROTTLE .set (true );
230+ FORCE_RETRIES .set (RETRY_COUNT );
231+
232+ AsyncTable <AdvancedScanResultConsumer > table =
233+ CONN .getTableBuilder (TABLE_NAME ).setOperationTimeout (1 , TimeUnit .MINUTES )
234+ .setMaxRetries (RETRY_COUNT + 1 ).setRetryPause (1 , TimeUnit .NANOSECONDS ).build ();
235+
236+ assertTimeBetween (() -> {
237+ table .get (new Get (Bytes .toBytes (0 ))).get ();
238+ return null ;
239+ }, WAIT_INTERVAL_NANOS , MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS );
240+ }
241+
196242 @ Test
197243 public void itWaitsForThrottledBatch () throws Exception {
198244 boolean isThrottled = true ;
@@ -244,6 +290,26 @@ public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception {
244290 }, WAIT_INTERVAL_NANOS , false );
245291 }
246292
293+ @ Test
294+ public void itDoesNotMultiplyThrottledBatchWait () throws Exception {
295+ THROTTLE .set (true );
296+ FORCE_RETRIES .set (RETRY_COUNT );
297+
298+ assertTimeBetween (() -> {
299+ List <CompletableFuture <?>> futures = new ArrayList <>();
300+ try (AsyncBufferedMutator mutator =
301+ CONN .getBufferedMutatorBuilder (TABLE_NAME ).setOperationTimeout (1 , TimeUnit .MINUTES )
302+ .setMaxRetries (RETRY_COUNT + 1 ).setRetryPause (1 , TimeUnit .NANOSECONDS ).build ()) {
303+ for (int i = 100 ; i < 110 ; i ++) {
304+ futures .add (mutator
305+ .mutate (new Put (Bytes .toBytes (i )).addColumn (FAMILY , QUALIFIER , Bytes .toBytes (i ))));
306+ }
307+ }
308+ CompletableFuture .allOf (futures .toArray (new CompletableFuture [0 ])).get ();
309+ return null ;
310+ }, WAIT_INTERVAL_NANOS , MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS );
311+ }
312+
247313 @ Test
248314 public void itWaitsForThrottledScan () throws Exception {
249315 boolean isThrottled = true ;
@@ -291,4 +357,24 @@ public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception {
291357 return null ;
292358 }, WAIT_INTERVAL_NANOS , false );
293359 }
360+
361+ @ Test
362+ public void itDoesNotMultiplyThrottledScanWait () throws Exception {
363+ THROTTLE .set (true );
364+ FORCE_RETRIES .set (RETRY_COUNT );
365+
366+ AsyncTable <AdvancedScanResultConsumer > table =
367+ CONN .getTableBuilder (TABLE_NAME ).setOperationTimeout (1 , TimeUnit .MINUTES )
368+ .setMaxRetries (RETRY_COUNT + 1 ).setRetryPause (1 , TimeUnit .NANOSECONDS ).build ();
369+
370+ assertTimeBetween (() -> {
371+ try (ResultScanner scanner = table .getScanner (new Scan ().setCaching (80 ))) {
372+ for (int i = 0 ; i < 100 ; i ++) {
373+ Result result = scanner .next ();
374+ assertArrayEquals (Bytes .toBytes (i ), result .getValue (FAMILY , QUALIFIER ));
375+ }
376+ }
377+ return null ;
378+ }, WAIT_INTERVAL_NANOS , MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS );
379+ }
294380}
0 commit comments