Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.time.Duration;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -70,10 +69,10 @@ public class QuotaCache implements Stoppable {
private final Object initializerLock = new Object();
private volatile boolean initialized = false;

private volatile Map<String, QuotaState> namespaceQuotaCache = new HashMap<>();
private volatile Map<TableName, QuotaState> tableQuotaCache = new HashMap<>();
private volatile Map<String, UserQuotaState> userQuotaCache = new HashMap<>();
private volatile Map<String, QuotaState> regionServerQuotaCache = new HashMap<>();
private volatile Map<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
private volatile Map<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
private volatile Map<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
private volatile Map<String, QuotaState> regionServerQuotaCache = new ConcurrentHashMap<>();

private volatile boolean exceedThrottleQuotaEnabled = false;
// factors used to divide cluster scope quota into machine scope quota
Expand Down Expand Up @@ -310,44 +309,48 @@ public synchronized boolean triggerNow() {

@Override
protected void chore() {
updateQuotaFactors();
synchronized (this) {
LOG.info("Reloading quota cache from hbase:quota table");
updateQuotaFactors();

try {
Map<String, UserQuotaState> newUserQuotaCache =
new ConcurrentHashMap<>(fetchUserQuotaStateEntries());
updateNewCacheFromOld(userQuotaCache, newUserQuotaCache);
userQuotaCache = newUserQuotaCache;
} catch (IOException e) {
LOG.error("Error while fetching user quotas", e);
}

try {
Map<String, UserQuotaState> newUserQuotaCache = new HashMap<>(fetchUserQuotaStateEntries());
updateNewCacheFromOld(userQuotaCache, newUserQuotaCache);
userQuotaCache = newUserQuotaCache;
} catch (IOException e) {
LOG.error("Error while fetching user quotas", e);
}
try {
Map<String, QuotaState> newRegionServerQuotaCache =
new ConcurrentHashMap<>(fetchRegionServerQuotaStateEntries());
updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache);
regionServerQuotaCache = newRegionServerQuotaCache;
} catch (IOException e) {
LOG.error("Error while fetching region server quotas", e);
}

try {
Map<String, QuotaState> newRegionServerQuotaCache =
new HashMap<>(fetchRegionServerQuotaStateEntries());
updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache);
regionServerQuotaCache = newRegionServerQuotaCache;
} catch (IOException e) {
LOG.error("Error while fetching region server quotas", e);
}
try {
Map<TableName, QuotaState> newTableQuotaCache =
new ConcurrentHashMap<>(fetchTableQuotaStateEntries());
updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache);
tableQuotaCache = newTableQuotaCache;
} catch (IOException e) {
LOG.error("Error while refreshing table quotas", e);
}

try {
Map<TableName, QuotaState> newTableQuotaCache =
new HashMap<>(fetchTableQuotaStateEntries());
updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache);
tableQuotaCache = newTableQuotaCache;
} catch (IOException e) {
LOG.error("Error while refreshing table quotas", e);
}
try {
Map<String, QuotaState> newNamespaceQuotaCache =
new ConcurrentHashMap<>(fetchNamespaceQuotaStateEntries());
updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache);
namespaceQuotaCache = newNamespaceQuotaCache;
} catch (IOException e) {
LOG.error("Error while refreshing namespace quotas", e);
}

try {
Map<String, QuotaState> newNamespaceQuotaCache =
new HashMap<>(fetchNamespaceQuotaStateEntries());
updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache);
namespaceQuotaCache = newNamespaceQuotaCache;
} catch (IOException e) {
LOG.error("Error while refreshing namespace quotas", e);
fetchExceedThrottleQuota();
}

fetchExceedThrottleQuota();
}

private void fetchExceedThrottleQuota() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,50 @@ public void testForgetsDeletedQuota() {
assertTrue(newCache.containsKey("my_table2"));
assertFalse(newCache.containsKey("my_table1"));
}

@Test
public void testLearnsNewQuota() {
Map<String, QuotaState> oldCache = new HashMap<>();

QuotaState newState = new QuotaState();
Map<String, QuotaState> newCache = new HashMap<>();
newCache.put("my_table1", newState);

QuotaCache.updateNewCacheFromOld(oldCache, newCache);

assertTrue(newCache.containsKey("my_table1"));
}

@Test
public void testUserSpecificOverridesDefaultNewQuota() {
// establish old cache with a limiter for 100 read bytes per second
QuotaState oldState = new QuotaState();
Map<String, QuotaState> oldCache = new HashMap<>();
oldCache.put("my_table", oldState);
QuotaProtos.Throttle throttle1 = QuotaProtos.Throttle.newBuilder()
.setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS)
.setSoftLimit(100).setScope(QuotaProtos.QuotaScope.MACHINE).build())
.build();
QuotaLimiter limiter1 = TimeBasedLimiter.fromThrottle(conf, throttle1);
oldState.setGlobalLimiter(limiter1);

// establish new cache, with a limiter for 999 read bytes per second
QuotaState newState = new QuotaState();
Map<String, QuotaState> newCache = new HashMap<>();
newCache.put("my_table", newState);
QuotaProtos.Throttle throttle2 = QuotaProtos.Throttle.newBuilder()
.setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS)
.setSoftLimit(999).setScope(QuotaProtos.QuotaScope.MACHINE).build())
.build();
QuotaLimiter limiter2 = TimeBasedLimiter.fromThrottle(conf, throttle2);
newState.setGlobalLimiter(limiter2);

// update new cache from old cache
QuotaCache.updateNewCacheFromOld(oldCache, newCache);

// verify that the 999 available bytes from the limiter was carried over
TimeBasedLimiter updatedLimiter =
(TimeBasedLimiter) newCache.get("my_table").getGlobalLimiter();
assertEquals(999, updatedLimiter.getReadAvailable());
}
}