diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DeadlineLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DeadlineLimiter.java new file mode 100644 index 00000000..6323201b --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DeadlineLimiter.java @@ -0,0 +1,114 @@ +/** + * Copyright 2018 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.limiter; + +import com.netflix.concurrency.limits.Limiter; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +/** + * {@link Limiter} that blocks the caller when the limit has been reached. The caller is + * blocked until the limiter has been released, or a deadline has been passed. + * + * @param + */ +public final class DeadlineLimiter implements Limiter { + + /** + * Wrap a limiter such that acquire will block until a provided deadline if the limit was reached + * instead of returning an empty listener immediately + * + * @param delegate Non-blocking limiter to wrap + * @param deadline The deadline to wait until for the limit to be released. + * @return Wrapped limiter + */ + public static DeadlineLimiter wrap(Limiter delegate, Instant deadline) { + return new DeadlineLimiter<>(delegate, deadline); + } + + private final Limiter delegate; + private final Instant deadline; + + /** + * Lock used to block and unblock callers as the limit is reached + */ + private final Object lock = new Object(); + + private DeadlineLimiter(Limiter limiter, Instant deadline) { + this.delegate = limiter; + this.deadline = deadline; + } + + private Optional tryAcquire(ContextT context) { + synchronized (lock) { + while (true) { + long timeout = Duration.between(Instant.now(), deadline).toMillis(); + if (timeout <= 0) { + return Optional.empty(); + } + // Try to acquire a token and return immediately if successful + final Optional listener = delegate.acquire(context); + if (listener.isPresent()) { + return listener; + } + + // We have reached the limit so block until a token is released + try { + lock.wait(timeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Optional.empty(); + } + } + } + } + + private void unblock() { + synchronized (lock) { + lock.notifyAll(); + } + } + + @Override + public Optional acquire(ContextT context) { + return tryAcquire(context).map(delegate -> new Listener() { + @Override + public void onSuccess() { + delegate.onSuccess(); + unblock(); + } + + @Override + public void onIgnore() { + delegate.onIgnore(); + unblock(); + } + + @Override + public void onDropped() { + delegate.onDropped(); + unblock(); + } + }); + } + + @Override + public String toString() { + return "DeadlineLimiter [" + delegate + "]"; + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java index 254952d7..ff3a3acb 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java @@ -64,7 +64,7 @@ public void testTimeout() { SettableLimit limit = SettableLimit.startingAt(1); BlockingLimiter limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build(), timeout); - // Acquire first, will succeeed an not block + // Acquire first, will succeed and not block limiter.acquire(null); // Second acquire should time out after at least 50 millis diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/DeadlineLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/DeadlineLimiterTest.java new file mode 100644 index 00000000..c7d85798 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/DeadlineLimiterTest.java @@ -0,0 +1,88 @@ +package com.netflix.concurrency.limits.limiter; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.SettableLimit; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.LinkedList; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertTrue; + +public class DeadlineLimiterTest { + @Test + public void test() { + SettableLimit limit = SettableLimit.startingAt(10); + DeadlineLimiter limiter = DeadlineLimiter.wrap( + SimpleLimiter.newBuilder().limit(limit).build(), + Instant.now().plusMillis(10)); + + LinkedList listeners = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + limiter.acquire(null).ifPresent(listeners::add); + } + + limit.setLimit(1); + + while (!listeners.isEmpty()) { + listeners.remove().onSuccess(); + } + + limiter.acquire(null); + } + + @Test + public void testMultipleBlockedThreads() throws InterruptedException, ExecutionException, TimeoutException { + int numThreads = 8; + SettableLimit limit = SettableLimit.startingAt(1); + DeadlineLimiter limiter = DeadlineLimiter.wrap( + SimpleLimiter.newBuilder().limit(limit).build(), + Instant.now().plusSeconds(1)); + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + try { + for (Future future : IntStream.range(0, numThreads) + .mapToObj(x -> executorService.submit(() -> limiter.acquire(null).get().onSuccess())) + .collect(Collectors.toList())) { + future.get(1, TimeUnit.SECONDS); + } + } finally { + executorService.shutdown(); + } + } + + @Test + public void testExceedDeadline() { + Instant deadline = Instant.now().plusMillis(50); + SettableLimit limit = SettableLimit.startingAt(1); + DeadlineLimiter limiter = DeadlineLimiter.wrap( + SimpleLimiter.newBuilder().limit(limit).build(), + deadline); + + // Acquire first, will succeed and not block + limiter.acquire(null); + + // Second acquire should time out after the deadline has been reached + Assert.assertFalse(limiter.acquire(null).isPresent()); + Instant after = Instant.now(); + + assertTrue(after.isAfter(deadline)); + } + + @Test(expected=TimeoutException.class) + public void testNoTimeout() throws InterruptedException, ExecutionException, TimeoutException { + SettableLimit limit = SettableLimit.startingAt(1); + DeadlineLimiter limiter = DeadlineLimiter.wrap( + SimpleLimiter.newBuilder().limit(limit).build(), + Instant.now().plusSeconds(2)); + limiter.acquire(null); + + CompletableFuture> future = CompletableFuture.supplyAsync(() -> limiter.acquire(null)); + future.get(1, TimeUnit.SECONDS); + } +}