diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java new file mode 100644 index 000000000000..6acfd07328e8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicDouble; + +/** + * An adaptive rate limiter that dynamically adjusts its behavior based on observed usage patterns + * to achieve stable, full utilization of configured quota allowances while managing client + * contention. + *

+ * Core Algorithm: This rate limiter divides time into fixed refill intervals (configurable + * via {@code hbase.quota.rate.limiter.refill.interval.ms}, default is 1 refill per TimeUnit of the + * RateLimiter). At the beginning of each interval, a fresh allocation of resources becomes + * available based on the configured limit. Clients consume resources as they make requests. When + * resources are exhausted, clients must wait until the next refill, or until enough resources + * become available. + *

+ * Adaptive Backpressure: When multiple threads compete for limited resources (contention), + * this limiter detects the contention and applies increasing backpressure by extending wait + * intervals. This prevents thundering herd behavior where many threads wake simultaneously and + * compete for the same resources. The backoff multiplier increases by a small increment (see + * {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}) per interval when contention occurs, and + * decreases (see {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}) when no contention is + * detected, converging toward optimal throughput. The multiplier is capped at a maximum value (see + * {@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}) to prevent unbounded waits. + *

+ * Contention is detected when {@link #getWaitInterval} is called with insufficient available + * resources (i.e., {@code amount > available}), indicating a thread needs to wait for resources. If + * this occurs more than once in a refill interval, the limiter identifies it as contention + * requiring increased backpressure. + *

+ * Oversubscription for Full Utilization: In practice, synchronization overhead and timing + * variations often prevent clients from consuming exactly their full allowance, resulting in + * consistent under-utilization. This limiter addresses this by tracking utilization via an + * exponentially weighted moving average (EWMA). When average utilization falls below the target + * range (determined by {@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}), the limiter gradually + * increases the oversubscription proportion (see + * {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}), allowing more resources per interval than + * the base limit. Conversely, when utilization exceeds the target range, oversubscription is + * decreased (see {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}). Oversubscription is capped + * (see {@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}) to prevent excessive bursts while still + * enabling consistent full utilization. + *

+ * Example Scenario: Consider a quota of 1000 requests per second with a 1-second refill + * interval. Without oversubscription, clients might typically achieve only 950 req/s due to + * coordination delays. This limiter would detect the under-utilization, gradually increase + * oversubscription, allowing slightly more resources per interval, which compensates for + * inefficiencies and achieves stable throughput closer to the configured quota. If multiple threads + * simultaneously try to consume resources and repeatedly wait, the backoff multiplier increases + * their wait times, spreading out their retry attempts and reducing wasted CPU cycles. + *

+ * Configuration Parameters: + *

+ *

+ * This algorithm converges toward stable operation where: (1) wait intervals are just long enough + * to prevent excessive contention, and (2) oversubscription is just high enough to achieve + * consistent full utilization of the configured allowance. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FeedbackAdaptiveRateLimiter extends RateLimiter { + + /** + * Amount to increase the backoff multiplier when contention is detected per refill interval. In + * other words, if we are throttling more than once per refill interval, then we will increase our + * wait intervals (increase backpressure, decrease throughput). + */ + public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT = + "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.increment"; + public static final double DEFAULT_BACKOFF_MULTIPLIER_INCREMENT = 0.0005; + + /** + * Amount to decrease the backoff multiplier when no contention is detected per refill interval. + * In other words, if we are only throttling once per refill interval, then we will decrease our + * wait interval (decrease backpressure, increase throughput). + */ + public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT = + "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.decrement"; + public static final double DEFAULT_BACKOFF_MULTIPLIER_DECREMENT = 0.0001; + + /** + * Maximum ceiling for the backoff multiplier to avoid unbounded waits. + */ + public static final String FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER = + "hbase.quota.rate.limiter.feedback.adaptive.max.backoff.multiplier"; + public static final double DEFAULT_MAX_BACKOFF_MULTIPLIER = 10.0; + + /** + * Amount to increase the oversubscription proportion when utilization is below (1.0-errorBudget). + */ + public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT = + "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.increment"; + public static final double DEFAULT_OVERSUBSCRIPTION_INCREMENT = 0.001; + + /** + * Amount to decrease the oversubscription proportion when utilization exceeds (1.0+errorBudget). + */ + public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT = + "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.decrement"; + public static final double DEFAULT_OVERSUBSCRIPTION_DECREMENT = 0.00005; + + /** + * Maximum ceiling for oversubscription to prevent unbounded bursts. Some oversubscription can be + * nice, because it allows you to balance the inefficiency and latency of retries, landing on + * stable usage at approximately your configured allowance. Without adequate oversubscription, + * your steady state may often seem significantly, and suspiciously, lower than your configured + * allowance. + */ + public static final String FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION = + "hbase.quota.rate.limiter.feedback.adaptive.max.oversubscription"; + public static final double DEFAULT_MAX_OVERSUBSCRIPTION = 0.25; + + /** + * Acceptable deviation around full utilization (1.0) for adjusting oversubscription. If stable + * throttle usage is typically under (1.0-errorBudget), then we will allow more oversubscription. + * If stable throttle usage is typically over (1.0+errorBudget), then we will pull back + * oversubscription. + */ + public static final String FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET = + "hbase.quota.rate.limiter.feedback.adaptive.utilization.error.budget"; + public static final double DEFAULT_UTILIZATION_ERROR_BUDGET = 0.025; + + private static final int WINDOW_TIME_MS = 60_000; + + public static class FeedbackAdaptiveRateLimiterFactory { + + private final long refillInterval; + private final double backoffMultiplierIncrement; + private final double backoffMultiplierDecrement; + private final double maxBackoffMultiplier; + private final double oversubscriptionIncrement; + private final double oversubscriptionDecrement; + private final double maxOversubscription; + private final double utilizationErrorBudget; + + public FeedbackAdaptiveRateLimiterFactory(Configuration conf) { + refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, + RateLimiter.DEFAULT_TIME_UNIT); + + maxBackoffMultiplier = + conf.getDouble(FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, DEFAULT_MAX_BACKOFF_MULTIPLIER); + + backoffMultiplierIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT, + DEFAULT_BACKOFF_MULTIPLIER_INCREMENT); + backoffMultiplierDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT, + DEFAULT_BACKOFF_MULTIPLIER_DECREMENT); + + oversubscriptionIncrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT, + DEFAULT_OVERSUBSCRIPTION_INCREMENT); + oversubscriptionDecrement = conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT, + DEFAULT_OVERSUBSCRIPTION_DECREMENT); + + maxOversubscription = + conf.getDouble(FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, DEFAULT_MAX_OVERSUBSCRIPTION); + utilizationErrorBudget = conf.getDouble(FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET, + DEFAULT_UTILIZATION_ERROR_BUDGET); + } + + public FeedbackAdaptiveRateLimiter create() { + return new FeedbackAdaptiveRateLimiter(refillInterval, backoffMultiplierIncrement, + backoffMultiplierDecrement, maxBackoffMultiplier, oversubscriptionIncrement, + oversubscriptionDecrement, maxOversubscription, utilizationErrorBudget); + } + } + + private volatile long nextRefillTime = -1L; + private final long refillInterval; + private final double backoffMultiplierIncrement; + private final double backoffMultiplierDecrement; + private final double maxBackoffMultiplier; + private final double oversubscriptionIncrement; + private final double oversubscriptionDecrement; + private final double maxOversubscription; + private final double minTargetUtilization; + private final double maxTargetUtilization; + + // Adaptive backoff state + private final AtomicDouble currentBackoffMultiplier = new AtomicDouble(1.0); + private volatile boolean hadContentionThisInterval = false; + + // Over-subscription proportion state + private final AtomicDouble oversubscriptionProportion = new AtomicDouble(0.0); + + // EWMA tracking + private final double emaAlpha; + private volatile double utilizationEma = 0.0; + private final AtomicLong lastIntervalConsumed; + + FeedbackAdaptiveRateLimiter(long refillInterval, double backoffMultiplierIncrement, + double backoffMultiplierDecrement, double maxBackoffMultiplier, + double oversubscriptionIncrement, double oversubscriptionDecrement, double maxOversubscription, + double utilizationErrorBudget) { + super(); + Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval, String.format( + "Refill interval %s must be ≤ TimeUnit millis %s", refillInterval, getTimeUnitInMillis())); + + Preconditions.checkArgument(backoffMultiplierIncrement > 0.0, + String.format("Backoff multiplier increment %s must be > 0.0", backoffMultiplierIncrement)); + Preconditions.checkArgument(backoffMultiplierDecrement > 0.0, + String.format("Backoff multiplier decrement %s must be > 0.0", backoffMultiplierDecrement)); + Preconditions.checkArgument(maxBackoffMultiplier > 1.0, + String.format("Max backoff multiplier %s must be > 1.0", maxBackoffMultiplier)); + Preconditions.checkArgument(utilizationErrorBudget > 0.0 && utilizationErrorBudget <= 1.0, + String.format("Utilization error budget %s must be between 0.0 and 1.0", + utilizationErrorBudget)); + + this.refillInterval = refillInterval; + this.backoffMultiplierIncrement = backoffMultiplierIncrement; + this.backoffMultiplierDecrement = backoffMultiplierDecrement; + this.maxBackoffMultiplier = maxBackoffMultiplier; + this.oversubscriptionIncrement = oversubscriptionIncrement; + this.oversubscriptionDecrement = oversubscriptionDecrement; + this.maxOversubscription = maxOversubscription; + this.minTargetUtilization = 1.0 - utilizationErrorBudget; + this.maxTargetUtilization = 1.0 + utilizationErrorBudget; + + this.emaAlpha = refillInterval / (double) (WINDOW_TIME_MS + refillInterval); + this.lastIntervalConsumed = new AtomicLong(0); + } + + @Override + public long refill(long limit) { + final long now = EnvironmentEdgeManager.currentTime(); + if (nextRefillTime == -1) { + nextRefillTime = now + refillInterval; + hadContentionThisInterval = false; + return getOversubscribedLimit(limit); + } + if (now < nextRefillTime) { + return 0; + } + long diff = refillInterval + now - nextRefillTime; + long refills = diff / refillInterval; + nextRefillTime = now + refillInterval; + + long intendedUsage = getRefillIntervalAdjustedLimit(limit); + if (intendedUsage > 0) { + long consumed = lastIntervalConsumed.get(); + if (consumed > 0) { + double util = (double) consumed / intendedUsage; + utilizationEma = emaAlpha * util + (1.0 - emaAlpha) * utilizationEma; + } + } + + if (hadContentionThisInterval) { + currentBackoffMultiplier.set(Math + .min(currentBackoffMultiplier.get() + backoffMultiplierIncrement, maxBackoffMultiplier)); + } else { + currentBackoffMultiplier + .set(Math.max(currentBackoffMultiplier.get() - backoffMultiplierDecrement, 1.0)); + } + + double avgUtil = utilizationEma; + if (avgUtil < minTargetUtilization) { + oversubscriptionProportion.set(Math + .min(oversubscriptionProportion.get() + oversubscriptionIncrement, maxOversubscription)); + } else if (avgUtil >= maxTargetUtilization) { + oversubscriptionProportion + .set(Math.max(oversubscriptionProportion.get() - oversubscriptionDecrement, 0.0)); + } + + hadContentionThisInterval = false; + lastIntervalConsumed.set(0); + + long refillAmount = refills * getRefillIntervalAdjustedLimit(limit); + long maxRefill = getOversubscribedLimit(limit); + return Math.min(maxRefill, refillAmount); + } + + private long getOversubscribedLimit(long limit) { + return limit + (long) (limit * oversubscriptionProportion.get()); + } + + @Override + public void consume(long amount) { + super.consume(amount); + lastIntervalConsumed.addAndGet(amount); + } + + @Override + public long getWaitInterval(long limit, long available, long amount) { + limit = getRefillIntervalAdjustedLimit(limit); + if (nextRefillTime == -1) { + return 0; + } + + final long now = EnvironmentEdgeManager.currentTime(); + final long refillTime = nextRefillTime; + long diff = amount - available; + if (diff > 0) { + hadContentionThisInterval = true; + } + + long nextInterval = refillTime - now; + if (diff <= limit) { + return applyBackoffMultiplier(nextInterval); + } + + long extra = diff / limit; + if (diff % limit == 0) { + extra--; + } + long baseWait = nextInterval + (extra * refillInterval); + return applyBackoffMultiplier(baseWait); + } + + private long getRefillIntervalAdjustedLimit(long limit) { + return (long) Math.ceil(refillInterval / (double) getTimeUnitInMillis() * limit); + } + + private long applyBackoffMultiplier(long baseWaitInterval) { + return (long) (baseWaitInterval * currentBackoffMultiplier.get()); + } + + // strictly for testing + @Override + public void setNextRefillTime(long nextRefillTime) { + this.nextRefillTime = nextRefillTime; + } + + @Override + public long getNextRefillTime() { + return this.nextRefillTime; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 43dfab703b74..38d171f1bf9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -46,11 +46,10 @@ public class TimeBasedLimiter implements QuotaLimiter { private RateLimiter reqHandlerUsageTimeLimiter = null; private TimeBasedLimiter(Configuration conf) { - if ( - FixedIntervalRateLimiter.class.getName().equals( - conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class) - .getName()) - ) { + String limiterClassName = + conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class) + .getName(); + if (FixedIntervalRateLimiter.class.getName().equals(limiterClassName)) { long refillInterval = conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, RateLimiter.DEFAULT_TIME_UNIT); reqsLimiter = new FixedIntervalRateLimiter(refillInterval); @@ -66,6 +65,22 @@ private TimeBasedLimiter(Configuration conf) { atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval); atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval); reqHandlerUsageTimeLimiter = new FixedIntervalRateLimiter(refillInterval); + } else if (FeedbackAdaptiveRateLimiter.class.getName().equals(limiterClassName)) { + FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory feedbackLimiterFactory = + new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf); + reqsLimiter = feedbackLimiterFactory.create(); + reqSizeLimiter = feedbackLimiterFactory.create(); + writeReqsLimiter = feedbackLimiterFactory.create(); + writeSizeLimiter = feedbackLimiterFactory.create(); + readReqsLimiter = feedbackLimiterFactory.create(); + readSizeLimiter = feedbackLimiterFactory.create(); + reqCapacityUnitLimiter = feedbackLimiterFactory.create(); + writeCapacityUnitLimiter = feedbackLimiterFactory.create(); + readCapacityUnitLimiter = feedbackLimiterFactory.create(); + atomicReqLimiter = feedbackLimiterFactory.create(); + atomicReadSizeLimiter = feedbackLimiterFactory.create(); + atomicWriteSizeLimiter = feedbackLimiterFactory.create(); + reqHandlerUsageTimeLimiter = feedbackLimiterFactory.create(); } else { reqsLimiter = new AverageIntervalRateLimiter(); reqSizeLimiter = new AverageIntervalRateLimiter(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java new file mode 100644 index 000000000000..a6a233205c8e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Verify the behavior of the FeedbackAdaptiveRateLimiter including adaptive backoff multipliers and + * over-subscription functionality. + */ +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestFeedbackAdaptiveRateLimiter { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestFeedbackAdaptiveRateLimiter.class); + + private ManualEnvironmentEdge testEdge; + private FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory factory; + + @Before + public void setUp() { + testEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(testEdge); + + Configuration conf = HBaseConfiguration.create(); + // Set refill interval for testing + conf.setLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, 500); + // Configure adaptive parameters for testing - using larger values than defaults for + // observability + conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT, 0.1); + conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT, + 0.05); + conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, 3.0); + conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT, 0.01); + conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT, 0.005); + conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, 0.2); + conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET, 0.1); + + factory = new FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf); + } + + @After + public void tearDown() { + EnvironmentEdgeManager.reset(); + } + + @Test + public void testBasicFunctionality() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + // Initially should work like normal rate limiter + assertEquals(0, limiter.getWaitIntervalMs()); + limiter.consume(5); + assertEquals(0, limiter.getWaitIntervalMs()); + limiter.consume(5); + + // Should need to wait after consuming full limit + assertTrue(limiter.getWaitIntervalMs() > 0); + } + + @Test + public void testAdaptiveBackoffIncreases() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + + // Record initial wait interval + limiter.consume(10); + long initialWaitInterval = limiter.getWaitInterval(10, 0, 1); + assertTrue("Initial wait interval should be positive", initialWaitInterval > 0); + + // Create sustained contention over multiple intervals to increase backoff + for (int i = 0; i < 5; i++) { + testEdge.setValue(1000 + (i + 1) * 500); + limiter.refill(10); + limiter.consume(10); + // Create contention by asking for more than available + limiter.getWaitInterval(10, 0, 1); + } + + // After contention, wait interval should increase due to backoff multiplier + testEdge.setValue(4000); + limiter.refill(10); + limiter.consume(10); + long increasedWaitInterval = limiter.getWaitInterval(10, 0, 1); + + // With backoffMultiplierIncrement=0.1 and 5 intervals of contention, + // multiplier should be around 1.5, so wait should be significantly higher + assertTrue( + "Wait interval should increase with contention. Initial: " + initialWaitInterval + + ", After contention: " + increasedWaitInterval, + increasedWaitInterval > initialWaitInterval * 1.3); + } + + @Test + public void testAdaptiveBackoffDecreases() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + + // Build up contention to increase backoff multiplier + for (int i = 0; i < 5; i++) { + limiter.consume(10); + limiter.getWaitInterval(10, 0, 1); // Create contention + testEdge.setValue(1000 + (i + 1) * 500); + limiter.refill(10); + } + + // Measure wait interval with elevated backoff + limiter.consume(10); + long elevatedWaitInterval = limiter.getWaitInterval(10, 0, 1); + + // Run several intervals without contention to decrease backoff + for (int i = 0; i < 10; i++) { + testEdge.setValue(4000 + i * 500); + limiter.refill(10); + // Consume less than available - no contention + limiter.consume(3); + } + + // Measure wait interval after backoff reduction + testEdge.setValue(9500); + limiter.refill(10); + limiter.consume(10); + long reducedWaitInterval = limiter.getWaitInterval(10, 0, 1); + + // After 10 intervals without contention (decrement=0.05 each), + // multiplier should decrease by ~0.5, making wait interval lower + assertTrue("Wait interval should decrease without contention. Elevated: " + elevatedWaitInterval + + ", Reduced: " + reducedWaitInterval, reducedWaitInterval < elevatedWaitInterval * 0.9); + } + + @Test + public void testOversubscriptionIncreasesWithLowUtilization() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + + // Initial refill to set up the limiter + long initialRefill = limiter.refill(10); + assertEquals("Initial refill should match limit", 10, initialRefill); + + // Create low utilization scenario (consuming much less than available) + // With error budget of 0.1, min target utilization is 0.9 + // We'll consume only ~40% to trigger oversubscription increase + // Refill interval adjusted limit is 5 (500ms / 1000ms * 10) + for (int i = 0; i < 30; i++) { + // Consume before advancing time so utilization is tracked + limiter.consume(2); // 2 out of 5 = 40% utilization + testEdge.setValue(1000 + (i + 1) * 500); + limiter.refill(10); + } + + // After many intervals of low utilization, oversubscription should have increased + // Now test that the oversubscription proportion actually affects refill behavior + // Consume all available to start fresh + limiter.consume((int) limiter.getAvailable()); + + // Jump forward by 3 refill intervals (1500ms) + // This tests that refill can return more than the base limit due to oversubscription + testEdge.setValue(16000 + 1500); + long multiIntervalRefill = limiter.refill(10); + + // With oversubscription at max (0.2), the oversubscribed limit is 10 * 1.2 = 12 + // With 3 intervals: refillAmount = 3 * 5 = 15 + // Result = min(12, 15) = 12, which exceeds the base limit of 10 + // Without oversubscription, this would be capped at min(10, 15) = 10 + assertTrue("With oversubscription from low utilization, refill should exceed base limit. Got: " + + multiIntervalRefill, multiIntervalRefill > 10); + } + + @Test + public void testOversubscriptionDecreasesWithHighUtilization() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + + // First, build up oversubscription with low utilization + limiter.refill(10); + for (int i = 0; i < 15; i++) { + testEdge.setValue(1000 + (i + 1) * 500); + limiter.refill(10); + limiter.consume(2); // Low utilization + } + + // Now create high utilization scenario (consuming more than target) + // With error budget of 0.1, max target utilization is 1.1 + // We'll consume close to the full interval-adjusted limit to trigger decrease + for (int i = 0; i < 10; i++) { + testEdge.setValue(8500 + (i + 1) * 500); + long refilled = limiter.refill(10); + // Consume full amount to show high utilization + limiter.consume((int) refilled); + } + + // After intervals of high utilization, oversubscription should decrease + testEdge.setValue(14000); + long refillAfterHighUtil = limiter.refill(10); + + // Oversubscription should have decreased, so refill should be closer to base limit + // With oversubscriptionDecrement=0.005 over 10 intervals, it should drop by ~0.05 + assertTrue( + "Refill should be closer to base after high utilization. Got: " + refillAfterHighUtil, + refillAfterHighUtil <= 6); + } + + @Test + public void testBackoffMultiplierCapsAtMaximum() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + + // Record base wait interval + limiter.consume(10); + long baseWaitInterval = limiter.getWaitInterval(10, 0, 1); + + // Create extreme sustained contention to push backoff to max + // With increment=0.1 and max=3.0, we need (3.0-1.0)/0.1 = 20 intervals + for (int i = 0; i < 25; i++) { + testEdge.setValue(1000 + (i + 1) * 500); + limiter.refill(10); + limiter.consume(10); + limiter.getWaitInterval(10, 0, 1); // Create contention + } + + // Measure wait at maximum backoff + testEdge.setValue(14000); + limiter.refill(10); + limiter.consume(10); + long maxBackoffWaitInterval = limiter.getWaitInterval(10, 0, 1); + + // Wait interval should be approximately 3x base (max multiplier) + assertTrue( + "Wait interval should cap at max multiplier. Base: " + baseWaitInterval + ", Max backoff: " + + maxBackoffWaitInterval, + maxBackoffWaitInterval >= baseWaitInterval * 2.5 + && maxBackoffWaitInterval <= baseWaitInterval * 3.5); + + // Additional contention should not increase wait further + testEdge.setValue(14500); + limiter.refill(10); + limiter.consume(10); + limiter.getWaitInterval(10, 0, 1); + + testEdge.setValue(15000); + limiter.refill(10); + limiter.consume(10); + long stillMaxWaitInterval = limiter.getWaitInterval(10, 0, 1); + + // Should still be at max, not increasing further + assertTrue( + "Wait should remain capped. Previous: " + maxBackoffWaitInterval + ", Current: " + + stillMaxWaitInterval, + Math.abs(stillMaxWaitInterval - maxBackoffWaitInterval) < baseWaitInterval * 0.2); + } + + @Test + public void testOversubscriptionCapsAtMaximum() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + + // Create extreme low utilization to push oversubscription to max + // With increment=0.01 and max=0.2, we need 0.2/0.01 = 20 intervals + for (int i = 0; i < 25; i++) { + testEdge.setValue(1000 + (i + 1) * 500); + limiter.refill(10); + // Very low consumption to maximize oversubscription increase + limiter.consume(1); + } + + // Check that refill is capped at max oversubscription + testEdge.setValue(14000); + long refillWithMaxOversubscription = limiter.refill(10); + + // With max oversubscription of 0.2, refill should be at most 5 * 1.2 = 6 + // (5 is the interval-adjusted limit for 500ms refill interval) + assertTrue("Refill should cap at max oversubscription. Got: " + refillWithMaxOversubscription, + refillWithMaxOversubscription <= 7); + + // Further low utilization should not increase refill + testEdge.setValue(14500); + limiter.refill(10); + limiter.consume(1); + + testEdge.setValue(15000); + long stillMaxRefill = limiter.refill(10); + + // Should remain at cap + assertEquals("Refill should remain at max oversubscription", refillWithMaxOversubscription, + stillMaxRefill); + } + + @Test + public void testMultipleRefillIntervals() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + limiter.consume(10); + + // Jump forward by multiple refill intervals (3 intervals = 1500ms) + testEdge.setValue(1000 + 1500); + + // Should refill 3 intervals worth, but capped at oversubscribed limit + long multiIntervalRefill = limiter.refill(10); + + // With 500ms refill interval, each interval gives 5 resources + // 3 intervals = 15, but capped at limit (no oversubscription yet) = 10 + assertTrue("Multiple interval refill should provide multiple refill amounts. Got: " + + multiIntervalRefill, multiIntervalRefill >= 10); + } + + @Test + public void testRefillIntervalAdjustment() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + + // First refill should give full limit + long firstRefill = limiter.refill(10); + assertEquals("First refill should give full limit", 10, firstRefill); + + limiter.consume(10); + + // After exactly one refill interval (500ms), should get interval-adjusted amount + testEdge.setValue(1000 + 500); + long adjustedRefill = limiter.refill(10); + + // 500ms is half of 1000ms time unit, so should get half the limit = 5 + assertEquals("Refill after one interval should be interval-adjusted", 5, adjustedRefill); + } + + @Test + public void testBackoffMultiplierBottomsAtOne() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + + // Record baseline wait with no backoff applied + limiter.consume(10); + long baselineWait = limiter.getWaitInterval(10, 0, 1); + + // Run many intervals without contention to ensure multiplier stays at 1.0 + for (int i = 0; i < 20; i++) { + testEdge.setValue(1000 + (i + 1) * 500); + limiter.refill(10); + limiter.consume(3); // No contention + } + + // Wait interval should still be at baseline (multiplier = 1.0) + testEdge.setValue(11500); + limiter.refill(10); + limiter.consume(10); + long noContentionWait = limiter.getWaitInterval(10, 0, 1); + + assertEquals("Wait interval should not go below baseline (multiplier=1.0)", baselineWait, + noContentionWait); + } + + @Test + public void testConcurrentAccess() throws InterruptedException { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(100, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(100); + + // Simulate concurrent access + Thread[] threads = new Thread[10]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + for (int j = 0; j < 10; j++) { + limiter.consume(1); + limiter.getWaitInterval(100, 50, 1); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Should complete without exceptions - basic thread safety verification + assertTrue("Concurrent access should complete successfully", true); + } + + @Test + public void testOverconsumptionBehavior() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + + // Over-consume significantly + limiter.consume(20); + + // Should require waiting for multiple intervals (500ms refill interval) + long waitInterval = limiter.getWaitInterval(10, -10, 1); + assertTrue("Should require substantial wait after over-consumption", waitInterval >= 500); + } + + @Test + public void testOscillatingLoadPattern() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + + // Oscillate between high contention and low contention + for (int cycle = 0; cycle < 3; cycle++) { + // High contention phase - increase backoff + for (int i = 0; i < 3; i++) { + testEdge.setValue(1000 + (cycle * 3000) + (i * 500)); + limiter.refill(10); + limiter.consume(10); + limiter.getWaitInterval(10, 0, 1); // Create contention + } + + long highContentionWait = limiter.getWaitInterval(10, 0, 1); + + // Low contention phase - decrease backoff + for (int i = 0; i < 3; i++) { + testEdge.setValue(1000 + (cycle * 3000) + 1500 + (i * 500)); + limiter.refill(10); + limiter.consume(3); // No contention + } + + testEdge.setValue(1000 + (cycle * 3000) + 3000); + limiter.refill(10); + limiter.consume(10); + long lowContentionWait = limiter.getWaitInterval(10, 0, 1); + + // After low contention phase, wait should be lower than after high contention + assertTrue( + "Wait should decrease after low contention phase in cycle " + cycle + ". High: " + + highContentionWait + ", Low: " + lowContentionWait, + lowContentionWait < highContentionWait); + } + } + + @Test + public void testUtilizationEmaConvergence() { + FeedbackAdaptiveRateLimiter limiter = factory.create(); + limiter.set(10, TimeUnit.SECONDS); + + testEdge.setValue(1000); + limiter.refill(10); + + // Consistently consume at 80% utilization + for (int i = 0; i < 30; i++) { + testEdge.setValue(1000 + (i + 1) * 500); + limiter.refill(10); + limiter.consume(4); // 4 out of 5 interval-adjusted = 80% + } + + // After many intervals, oversubscription should stabilize + // At 80% utilization (below 90% target), oversubscription should increase + testEdge.setValue(16500); + limiter.refill(10); + + // Now switch to 100% utilization + for (int i = 0; i < 30; i++) { + testEdge.setValue(16500 + (i + 1) * 500); + long refilled = limiter.refill(10); + limiter.consume((int) refilled); // Consume everything + } + + // At 100% utilization (within target range), oversubscription should stabilize + testEdge.setValue(32000); + limiter.refill(10); + + // The EMA should have adjusted, and refills should be different + // (though exact values depend on EMA convergence rate) + assertTrue("Refill behavior should adapt to utilization patterns", true); + } + + private static final class ManualEnvironmentEdge implements EnvironmentEdge { + private long currentTime = 1000; + + public void setValue(long time) { + this.currentTime = time; + } + + @Override + public long currentTime() { + return currentTime; + } + } +}