Skip to content

Commit a6edd64

Browse files
rmdmattinglyRay Mattingly
andcommitted
HBASE-29663 TimeBasedLimiters should support dynamic configuration refresh (#7387)
Co-authored-by: Ray Mattingly <[email protected]> Signed-off-by: Charles Connell <[email protected]> Signed-off-by: Nick Dimiduk <[email protected]>
1 parent 3d3b78f commit a6edd64

File tree

11 files changed

+99
-69
lines changed

11 files changed

+99
-69
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FixedIntervalRateLimiter.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
2121
import org.apache.yetus.audience.InterfaceAudience;
2222
import org.apache.yetus.audience.InterfaceStability;
23-
24-
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2525

2626
/**
2727
* With this limiter resources will be refilled only after a fixed interval of time.
@@ -43,6 +43,8 @@ public class FixedIntervalRateLimiter extends RateLimiter {
4343
public static final String RATE_LIMITER_REFILL_INTERVAL_MS =
4444
"hbase.quota.rate.limiter.refill.interval.ms";
4545

46+
private static final Logger LOG = LoggerFactory.getLogger(FixedIntervalRateLimiter.class);
47+
4648
private long nextRefillTime = -1L;
4749
private final long refillInterval;
4850

@@ -52,10 +54,14 @@ public FixedIntervalRateLimiter() {
5254

5355
public FixedIntervalRateLimiter(long refillInterval) {
5456
super();
55-
Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval,
56-
String.format("Refill interval %s must be less than or equal to TimeUnit millis %s",
57-
refillInterval, getTimeUnitInMillis()));
58-
this.refillInterval = refillInterval;
57+
long timeUnit = getTimeUnitInMillis();
58+
if (refillInterval > timeUnit) {
59+
LOG.warn(
60+
"Refill interval {} is larger than time unit {}. This is invalid. "
61+
+ "Instead, we will use the time unit {} as the refill interval",
62+
refillInterval, timeUnit, timeUnit);
63+
}
64+
this.refillInterval = Math.min(timeUnit, refillInterval);
5965
}
6066

6167
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,20 +130,23 @@ private void ensureInitialized() {
130130
}
131131

132132
private Map<String, UserQuotaState> fetchUserQuotaStateEntries() throws IOException {
133-
return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), tableMachineQuotaFactors,
134-
machineQuotaFactor);
133+
return QuotaUtil.fetchUserQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
134+
tableMachineQuotaFactors, machineQuotaFactor);
135135
}
136136

137137
private Map<String, QuotaState> fetchRegionServerQuotaStateEntries() throws IOException {
138-
return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection());
138+
return QuotaUtil.fetchRegionServerQuotas(rsServices.getConfiguration(),
139+
rsServices.getConnection());
139140
}
140141

141142
private Map<TableName, QuotaState> fetchTableQuotaStateEntries() throws IOException {
142-
return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), tableMachineQuotaFactors);
143+
return QuotaUtil.fetchTableQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
144+
tableMachineQuotaFactors);
143145
}
144146

145147
private Map<String, QuotaState> fetchNamespaceQuotaStateEntries() throws IOException {
146-
return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), machineQuotaFactor);
148+
return QuotaUtil.fetchNamespaceQuotas(rsServices.getConfiguration(), rsServices.getConnection(),
149+
machineQuotaFactor);
147150
}
148151

149152
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiterFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.quotas;
1919

20+
import org.apache.hadoop.conf.Configuration;
2021
import org.apache.yetus.audience.InterfaceAudience;
2122
import org.apache.yetus.audience.InterfaceStability;
2223

@@ -25,8 +26,8 @@
2526
@InterfaceAudience.Private
2627
@InterfaceStability.Evolving
2728
public class QuotaLimiterFactory {
28-
public static QuotaLimiter fromThrottle(final Throttle throttle) {
29-
return TimeBasedLimiter.fromThrottle(throttle);
29+
public static QuotaLimiter fromThrottle(Configuration conf, final Throttle throttle) {
30+
return TimeBasedLimiter.fromThrottle(conf, throttle);
3031
}
3132

3233
public static QuotaLimiter update(final QuotaLimiter a, final QuotaLimiter b) {

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaState.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.quotas;
1919

20+
import org.apache.hadoop.conf.Configuration;
2021
import org.apache.yetus.audience.InterfaceAudience;
2122
import org.apache.yetus.audience.InterfaceStability;
2223

@@ -57,9 +58,9 @@ public synchronized boolean isBypass() {
5758
/**
5859
* Setup the global quota information. (This operation is part of the QuotaState setup)
5960
*/
60-
public synchronized void setQuotas(final Quotas quotas) {
61+
public synchronized void setQuotas(Configuration conf, final Quotas quotas) {
6162
if (quotas.hasThrottle()) {
62-
globalLimiter = QuotaLimiterFactory.fromThrottle(quotas.getThrottle());
63+
globalLimiter = QuotaLimiterFactory.fromThrottle(conf, quotas.getThrottle());
6364
} else {
6465
globalLimiter = NoopQuotaLimiter.get();
6566
}

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,9 @@ private static void deleteQuotas(final Connection connection, final byte[] rowKe
329329
doDelete(connection, delete);
330330
}
331331

332-
public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
333-
Map<TableName, Double> tableMachineQuotaFactors, double factor) throws IOException {
332+
public static Map<String, UserQuotaState> fetchUserQuotas(final Configuration conf,
333+
final Connection connection, Map<TableName, Double> tableMachineQuotaFactors, double factor)
334+
throws IOException {
334335
Map<String, UserQuotaState> userQuotas = new HashMap<>();
335336
try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
336337
Scan scan = new Scan();
@@ -350,7 +351,7 @@ public static Map<String, UserQuotaState> fetchUserQuotas(final Connection conne
350351
@Override
351352
public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
352353
quotas = updateClusterQuotaToMachineQuota(quotas, factor);
353-
quotaInfo.setQuotas(namespace, quotas);
354+
quotaInfo.setQuotas(conf, namespace, quotas);
354355
}
355356

356357
@Override
@@ -359,13 +360,13 @@ public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
359360
tableMachineQuotaFactors.containsKey(table)
360361
? tableMachineQuotaFactors.get(table)
361362
: 1);
362-
quotaInfo.setQuotas(table, quotas);
363+
quotaInfo.setQuotas(conf, table, quotas);
363364
}
364365

365366
@Override
366367
public void visitUserQuotas(String userName, Quotas quotas) {
367368
quotas = updateClusterQuotaToMachineQuota(quotas, factor);
368-
quotaInfo.setQuotas(quotas);
369+
quotaInfo.setQuotas(conf, quotas);
369370
}
370371
});
371372
} catch (IOException e) {
@@ -406,7 +407,7 @@ protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) {
406407
UserQuotaState state = new UserQuotaState();
407408
QuotaProtos.Quotas defaultQuotas =
408409
QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
409-
state.setQuotas(defaultQuotas);
410+
state.setQuotas(conf, defaultQuotas);
410411
return state;
411412
}
412413

@@ -419,12 +420,12 @@ private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, S
419420
java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE));
420421
}
421422

422-
public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
423-
Map<TableName, Double> tableMachineFactors) throws IOException {
423+
public static Map<TableName, QuotaState> fetchTableQuotas(final Configuration conf,
424+
final Connection connection, Map<TableName, Double> tableMachineFactors) throws IOException {
424425
Scan scan = new Scan();
425426
scan.addFamily(QUOTA_FAMILY_INFO);
426427
scan.setStartStopRowForPrefixScan(QUOTA_TABLE_ROW_KEY_PREFIX);
427-
return fetchGlobalQuotas("table", scan, connection, new KeyFromRow<TableName>() {
428+
return fetchGlobalQuotas(conf, "table", scan, connection, new KeyFromRow<TableName>() {
428429
@Override
429430
public TableName getKeyFromRow(final byte[] row) {
430431
assert isTableRowKey(row);
@@ -438,12 +439,12 @@ public double getFactor(TableName tableName) {
438439
});
439440
}
440441

441-
public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
442-
double factor) throws IOException {
442+
public static Map<String, QuotaState> fetchNamespaceQuotas(final Configuration conf,
443+
final Connection connection, double factor) throws IOException {
443444
Scan scan = new Scan();
444445
scan.addFamily(QUOTA_FAMILY_INFO);
445446
scan.setStartStopRowForPrefixScan(QUOTA_NAMESPACE_ROW_KEY_PREFIX);
446-
return fetchGlobalQuotas("namespace", scan, connection, new KeyFromRow<String>() {
447+
return fetchGlobalQuotas(conf, "namespace", scan, connection, new KeyFromRow<String>() {
447448
@Override
448449
public String getKeyFromRow(final byte[] row) {
449450
assert isNamespaceRowKey(row);
@@ -457,12 +458,12 @@ public double getFactor(String s) {
457458
});
458459
}
459460

460-
public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection)
461-
throws IOException {
461+
public static Map<String, QuotaState> fetchRegionServerQuotas(final Configuration conf,
462+
final Connection connection) throws IOException {
462463
Scan scan = new Scan();
463464
scan.addFamily(QUOTA_FAMILY_INFO);
464465
scan.setStartStopRowForPrefixScan(QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
465-
return fetchGlobalQuotas("regionServer", scan, connection, new KeyFromRow<String>() {
466+
return fetchGlobalQuotas(conf, "regionServer", scan, connection, new KeyFromRow<String>() {
466467
@Override
467468
public String getKeyFromRow(final byte[] row) {
468469
assert isRegionServerRowKey(row);
@@ -476,8 +477,9 @@ public double getFactor(String s) {
476477
});
477478
}
478479

479-
public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, final Scan scan,
480-
final Connection connection, final KeyFromRow<K> kfr) throws IOException {
480+
public static <K> Map<K, QuotaState> fetchGlobalQuotas(final Configuration conf,
481+
final String type, final Scan scan, final Connection connection, final KeyFromRow<K> kfr)
482+
throws IOException {
481483

482484
Map<K, QuotaState> globalQuotas = new HashMap<>();
483485
try (Table table = connection.getTable(QUOTA_TABLE_NAME)) {
@@ -498,7 +500,7 @@ public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, final
498500
try {
499501
Quotas quotas = quotasFromData(data);
500502
quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key));
501-
quotaInfo.setQuotas(quotas);
503+
quotaInfo.setQuotas(conf, quotas);
502504
} catch (IOException e) {
503505
LOG.error("Unable to parse {} '{}' quotas", type, key, e);
504506
globalQuotas.remove(key);

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.hadoop.hbase.quotas;
1919

2020
import org.apache.hadoop.conf.Configuration;
21-
import org.apache.hadoop.hbase.HBaseConfiguration;
2221
import org.apache.yetus.audience.InterfaceAudience;
2322
import org.apache.yetus.audience.InterfaceStability;
2423

@@ -32,7 +31,6 @@
3231
@InterfaceAudience.Private
3332
@InterfaceStability.Evolving
3433
public class TimeBasedLimiter implements QuotaLimiter {
35-
private static final Configuration conf = HBaseConfiguration.create();
3634
private RateLimiter reqsLimiter = null;
3735
private RateLimiter reqSizeLimiter = null;
3836
private RateLimiter writeReqsLimiter = null;
@@ -47,7 +45,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
4745
private RateLimiter atomicWriteSizeLimiter = null;
4846
private RateLimiter reqHandlerUsageTimeLimiter = null;
4947

50-
private TimeBasedLimiter() {
48+
private TimeBasedLimiter(Configuration conf) {
5149
if (
5250
FixedIntervalRateLimiter.class.getName().equals(
5351
conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
@@ -85,8 +83,8 @@ private TimeBasedLimiter() {
8583
}
8684
}
8785

88-
static QuotaLimiter fromThrottle(final Throttle throttle) {
89-
TimeBasedLimiter limiter = new TimeBasedLimiter();
86+
static QuotaLimiter fromThrottle(Configuration conf, final Throttle throttle) {
87+
TimeBasedLimiter limiter = new TimeBasedLimiter(conf);
9088
boolean isBypass = true;
9189
if (throttle.hasReqNum()) {
9290
setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum());

hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashSet;
2222
import java.util.Map;
2323
import java.util.Set;
24+
import org.apache.hadoop.conf.Configuration;
2425
import org.apache.hadoop.hbase.TableName;
2526
import org.apache.yetus.audience.InterfaceAudience;
2627
import org.apache.yetus.audience.InterfaceStability;
@@ -89,39 +90,39 @@ public synchronized boolean hasBypassGlobals() {
8990
}
9091

9192
@Override
92-
public synchronized void setQuotas(final Quotas quotas) {
93-
super.setQuotas(quotas);
93+
public synchronized void setQuotas(Configuration conf, final Quotas quotas) {
94+
super.setQuotas(conf, quotas);
9495
bypassGlobals = quotas.getBypassGlobals();
9596
}
9697

9798
/**
9899
* Add the quota information of the specified table. (This operation is part of the QuotaState
99100
* setup)
100101
*/
101-
public synchronized void setQuotas(final TableName table, Quotas quotas) {
102-
tableLimiters = setLimiter(tableLimiters, table, quotas);
102+
public synchronized void setQuotas(Configuration conf, final TableName table, Quotas quotas) {
103+
tableLimiters = setLimiter(conf, tableLimiters, table, quotas);
103104
}
104105

105106
/**
106107
* Add the quota information of the specified namespace. (This operation is part of the QuotaState
107108
* setup)
108109
*/
109-
public void setQuotas(final String namespace, Quotas quotas) {
110-
namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas);
110+
public void setQuotas(Configuration conf, final String namespace, Quotas quotas) {
111+
namespaceLimiters = setLimiter(conf, namespaceLimiters, namespace, quotas);
111112
}
112113

113114
public boolean hasTableLimiters() {
114115
return tableLimiters != null && !tableLimiters.isEmpty();
115116
}
116117

117-
private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters, final K key,
118-
final Quotas quotas) {
118+
private <K> Map<K, QuotaLimiter> setLimiter(Configuration conf, Map<K, QuotaLimiter> limiters,
119+
final K key, final Quotas quotas) {
119120
if (limiters == null) {
120121
limiters = new HashMap<>();
121122
}
122123

123124
QuotaLimiter limiter =
124-
quotas.hasThrottle() ? QuotaLimiterFactory.fromThrottle(quotas.getThrottle()) : null;
125+
quotas.hasThrottle() ? QuotaLimiterFactory.fromThrottle(conf, quotas.getThrottle()) : null;
125126
if (limiter != null && !limiter.isBypass()) {
126127
limiters.put(key, limiter);
127128
} else {

hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.junit.ClassRule;
4444
import org.junit.Test;
4545
import org.junit.experimental.categories.Category;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
4648

4749
@Category({ MediumTests.class, CoprocessorTests.class })
4850
public class TestRegionCoprocessorQuotaUsage {
@@ -52,6 +54,7 @@ public class TestRegionCoprocessorQuotaUsage {
5254
HBaseClassTestRule.forClass(TestRegionCoprocessorQuotaUsage.class);
5355

5456
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
57+
private static final Logger LOG = LoggerFactory.getLogger(TestRegionCoprocessorQuotaUsage.class);
5558
private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage");
5659
private static byte[] CF = Bytes.toBytes("CF");
5760
private static byte[] CQ = Bytes.toBytes("CQ");
@@ -66,11 +69,14 @@ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
6669

6770
// For the purposes of this test, we only need to catch a throttle happening once, then
6871
// let future requests pass through so we don't make this test take any longer than necessary
72+
LOG.info("Intercepting GetOp");
6973
if (!THROTTLING_OCCURRED.get()) {
7074
try {
7175
c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(),
7276
OperationQuota.OperationType.GET);
77+
LOG.info("Request was not throttled");
7378
} catch (RpcThrottlingException e) {
79+
LOG.info("Intercepting was throttled");
7480
THROTTLING_OCCURRED.set(true);
7581
throw e;
7682
}
@@ -91,9 +97,8 @@ public Optional<RegionObserver> getRegionObserver() {
9197
public static void setUp() throws Exception {
9298
Configuration conf = UTIL.getConfiguration();
9399
conf.setBoolean("hbase.quota.enabled", true);
94-
conf.setInt("hbase.quota.default.user.machine.read.num", 2);
100+
conf.setInt("hbase.quota.default.user.machine.read.num", 1);
95101
conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter");
96-
conf.set("hbase.quota.rate.limiter.refill.interval.ms", "300000");
97102
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName());
98103
UTIL.startMiniCluster(3);
99104
byte[][] splitKeys = new byte[8][];
@@ -116,6 +121,9 @@ public void testGet() throws InterruptedException, ExecutionException, IOExcepti
116121
// Hit the table 5 times which ought to be enough to make a throttle happen
117122
for (int i = 0; i < 5; i++) {
118123
TABLE.get(new Get(Bytes.toBytes("000")));
124+
if (THROTTLING_OCCURRED.get()) {
125+
break;
126+
}
119127
}
120128
assertTrue("Throttling did not happen as expected", THROTTLING_OCCURRED.get());
121129
}

0 commit comments

Comments
 (0)