From d05966dbe05615c19d848926be17b11190f8c896 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Wed, 17 Sep 2025 16:00:12 -0400 Subject: [PATCH 1/2] HBASE-29677: Thread safety in QuotaRefresherChore --- .../hadoop/hbase/quotas/QuotaCache.java | 79 ++++++++++--------- .../hadoop/hbase/quotas/TestQuotaCache2.java | 46 +++++++++++ 2 files changed, 87 insertions(+), 38 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index b0e76663455b..e6144de2c777 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.time.Duration; import java.util.EnumSet; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -70,10 +69,10 @@ public class QuotaCache implements Stoppable { private final Object initializerLock = new Object(); private volatile boolean initialized = false; - private volatile Map namespaceQuotaCache = new HashMap<>(); - private volatile Map tableQuotaCache = new HashMap<>(); - private volatile Map userQuotaCache = new HashMap<>(); - private volatile Map regionServerQuotaCache = new HashMap<>(); + private volatile Map namespaceQuotaCache = new ConcurrentHashMap<>(); + private volatile Map tableQuotaCache = new ConcurrentHashMap<>(); + private volatile Map userQuotaCache = new ConcurrentHashMap<>(); + private volatile Map regionServerQuotaCache = new ConcurrentHashMap<>(); private volatile boolean exceedThrottleQuotaEnabled = false; // factors used to divide cluster scope quota into machine scope quota @@ -310,44 +309,48 @@ public synchronized boolean triggerNow() { @Override protected void chore() { - updateQuotaFactors(); + synchronized (this) { + LOG.info("Reloading quota cache from hbase:quota table"); + updateQuotaFactors(); + + try { + Map newUserQuotaCache = + new ConcurrentHashMap<>(fetchUserQuotaStateEntries()); + updateNewCacheFromOld(userQuotaCache, newUserQuotaCache); + userQuotaCache = newUserQuotaCache; + } catch (IOException e) { + LOG.error("Error while fetching user quotas", e); + } - try { - Map newUserQuotaCache = new HashMap<>(fetchUserQuotaStateEntries()); - updateNewCacheFromOld(userQuotaCache, newUserQuotaCache); - userQuotaCache = newUserQuotaCache; - } catch (IOException e) { - LOG.error("Error while fetching user quotas", e); - } + try { + Map newRegionServerQuotaCache = + new ConcurrentHashMap<>(fetchRegionServerQuotaStateEntries()); + updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache); + regionServerQuotaCache = newRegionServerQuotaCache; + } catch (IOException e) { + LOG.error("Error while fetching region server quotas", e); + } - try { - Map newRegionServerQuotaCache = - new HashMap<>(fetchRegionServerQuotaStateEntries()); - updateNewCacheFromOld(regionServerQuotaCache, newRegionServerQuotaCache); - regionServerQuotaCache = newRegionServerQuotaCache; - } catch (IOException e) { - LOG.error("Error while fetching region server quotas", e); - } + try { + Map newTableQuotaCache = + new ConcurrentHashMap<>(fetchTableQuotaStateEntries()); + updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache); + tableQuotaCache = newTableQuotaCache; + } catch (IOException e) { + LOG.error("Error while refreshing table quotas", e); + } - try { - Map newTableQuotaCache = - new HashMap<>(fetchTableQuotaStateEntries()); - updateNewCacheFromOld(tableQuotaCache, newTableQuotaCache); - tableQuotaCache = newTableQuotaCache; - } catch (IOException e) { - LOG.error("Error while refreshing table quotas", e); - } + try { + Map newNamespaceQuotaCache = + new ConcurrentHashMap<>(fetchNamespaceQuotaStateEntries()); + updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache); + namespaceQuotaCache = newNamespaceQuotaCache; + } catch (IOException e) { + LOG.error("Error while refreshing namespace quotas", e); + } - try { - Map newNamespaceQuotaCache = - new HashMap<>(fetchNamespaceQuotaStateEntries()); - updateNewCacheFromOld(namespaceQuotaCache, newNamespaceQuotaCache); - namespaceQuotaCache = newNamespaceQuotaCache; - } catch (IOException e) { - LOG.error("Error while refreshing namespace quotas", e); + fetchExceedThrottleQuota(); } - - fetchExceedThrottleQuota(); } private void fetchExceedThrottleQuota() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java index 8f8ac4991ca6..3f4c88a5853d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java @@ -131,4 +131,50 @@ public void testForgetsDeletedQuota() { assertTrue(newCache.containsKey("my_table2")); assertFalse(newCache.containsKey("my_table1")); } + + @Test + public void testLearnsNewQuota() { + Map oldCache = new HashMap<>(); + + QuotaState newState = new QuotaState(); + Map newCache = new HashMap<>(); + newCache.put("my_table1", newState); + + QuotaCache.updateNewCacheFromOld(oldCache, newCache); + + assertTrue(newCache.containsKey("my_table1")); + } + + @Test + public void testUserSpecificOverridesDefaultNewQuota() { + // establish old cache with a limiter for 100 read bytes per second + QuotaState oldState = new QuotaState(); + Map oldCache = new HashMap<>(); + oldCache.put("my_table", oldState); + QuotaProtos.Throttle throttle1 = QuotaProtos.Throttle.newBuilder() + .setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS) + .setSoftLimit(100).setScope(QuotaProtos.QuotaScope.MACHINE).build()) + .build(); + QuotaLimiter limiter1 = TimeBasedLimiter.fromThrottle(throttle1); + oldState.setGlobalLimiter(limiter1); + + // establish new cache, with a limiter for 999 read bytes per second + QuotaState newState = new QuotaState(); + Map newCache = new HashMap<>(); + newCache.put("my_table", newState); + QuotaProtos.Throttle throttle2 = QuotaProtos.Throttle.newBuilder() + .setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS) + .setSoftLimit(999).setScope(QuotaProtos.QuotaScope.MACHINE).build()) + .build(); + QuotaLimiter limiter2 = TimeBasedLimiter.fromThrottle(throttle2); + newState.setGlobalLimiter(limiter2); + + // update new cache from old cache + QuotaCache.updateNewCacheFromOld(oldCache, newCache); + + // verify that the 999 available bytes from the limiter was carried over + TimeBasedLimiter updatedLimiter = + (TimeBasedLimiter) newCache.get("my_table").getGlobalLimiter(); + assertEquals(999, updatedLimiter.getReadAvailable()); + } } From 6f3b60efbab1cad3b9daaedcdf8ac25f32c33472 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Mon, 20 Oct 2025 20:39:53 -0400 Subject: [PATCH 2/2] merge conflict --- .../java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java index 3f4c88a5853d..3e829b5c08af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaCache2.java @@ -155,7 +155,7 @@ public void testUserSpecificOverridesDefaultNewQuota() { .setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS) .setSoftLimit(100).setScope(QuotaProtos.QuotaScope.MACHINE).build()) .build(); - QuotaLimiter limiter1 = TimeBasedLimiter.fromThrottle(throttle1); + QuotaLimiter limiter1 = TimeBasedLimiter.fromThrottle(conf, throttle1); oldState.setGlobalLimiter(limiter1); // establish new cache, with a limiter for 999 read bytes per second @@ -166,7 +166,7 @@ public void testUserSpecificOverridesDefaultNewQuota() { .setReadSize(QuotaProtos.TimedQuota.newBuilder().setTimeUnit(HBaseProtos.TimeUnit.SECONDS) .setSoftLimit(999).setScope(QuotaProtos.QuotaScope.MACHINE).build()) .build(); - QuotaLimiter limiter2 = TimeBasedLimiter.fromThrottle(throttle2); + QuotaLimiter limiter2 = TimeBasedLimiter.fromThrottle(conf, throttle2); newState.setGlobalLimiter(limiter2); // update new cache from old cache