From e5d097e7ce6b9c5db1f1cd2c71c409e6b240feb7 Mon Sep 17 00:00:00 2001 From: Benjamin Duffield Date: Thu, 14 Nov 2019 21:36:44 -0500 Subject: [PATCH 1/2] bonus test comment fix --- .../netflix/concurrency/limits/limiter/BlockingLimiterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java index 254952d7..ff3a3acb 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java @@ -64,7 +64,7 @@ public void testTimeout() { SettableLimit limit = SettableLimit.startingAt(1); BlockingLimiter limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build(), timeout); - // Acquire first, will succeeed an not block + // Acquire first, will succeed and not block limiter.acquire(null); // Second acquire should time out after at least 50 millis From 3663bbf68164b73d17385a6578c1bbd99da5cbd6 Mon Sep 17 00:00:00 2001 From: Benjamin Duffield Date: Thu, 14 Nov 2019 21:38:58 -0500 Subject: [PATCH 2/2] Add `DeadlineLimiter` This is essentially a variation on the existing `BlockingLimiter`. This can be useful when a number of tasks have to be performed in order to service some other request, and either these tasks must be performed by some deadline or the whole request should be failed. --- .../limits/limiter/DeadlineLimiter.java | 114 ++++++++++++++++++ .../limits/limiter/DeadlineLimiterTest.java | 88 ++++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DeadlineLimiter.java create mode 100644 concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/DeadlineLimiterTest.java diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DeadlineLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DeadlineLimiter.java new file mode 100644 index 00000000..6323201b --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DeadlineLimiter.java @@ -0,0 +1,114 @@ +/** + * Copyright 2018 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.concurrency.limits.limiter; + +import com.netflix.concurrency.limits.Limiter; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +/** + * {@link Limiter} that blocks the caller when the limit has been reached. The caller is + * blocked until the limiter has been released, or a deadline has been passed. + * + * @param + */ +public final class DeadlineLimiter implements Limiter { + + /** + * Wrap a limiter such that acquire will block until a provided deadline if the limit was reached + * instead of returning an empty listener immediately + * + * @param delegate Non-blocking limiter to wrap + * @param deadline The deadline to wait until for the limit to be released. + * @return Wrapped limiter + */ + public static DeadlineLimiter wrap(Limiter delegate, Instant deadline) { + return new DeadlineLimiter<>(delegate, deadline); + } + + private final Limiter delegate; + private final Instant deadline; + + /** + * Lock used to block and unblock callers as the limit is reached + */ + private final Object lock = new Object(); + + private DeadlineLimiter(Limiter limiter, Instant deadline) { + this.delegate = limiter; + this.deadline = deadline; + } + + private Optional tryAcquire(ContextT context) { + synchronized (lock) { + while (true) { + long timeout = Duration.between(Instant.now(), deadline).toMillis(); + if (timeout <= 0) { + return Optional.empty(); + } + // Try to acquire a token and return immediately if successful + final Optional listener = delegate.acquire(context); + if (listener.isPresent()) { + return listener; + } + + // We have reached the limit so block until a token is released + try { + lock.wait(timeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Optional.empty(); + } + } + } + } + + private void unblock() { + synchronized (lock) { + lock.notifyAll(); + } + } + + @Override + public Optional acquire(ContextT context) { + return tryAcquire(context).map(delegate -> new Listener() { + @Override + public void onSuccess() { + delegate.onSuccess(); + unblock(); + } + + @Override + public void onIgnore() { + delegate.onIgnore(); + unblock(); + } + + @Override + public void onDropped() { + delegate.onDropped(); + unblock(); + } + }); + } + + @Override + public String toString() { + return "DeadlineLimiter [" + delegate + "]"; + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/DeadlineLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/DeadlineLimiterTest.java new file mode 100644 index 00000000..c7d85798 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/DeadlineLimiterTest.java @@ -0,0 +1,88 @@ +package com.netflix.concurrency.limits.limiter; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.SettableLimit; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.LinkedList; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertTrue; + +public class DeadlineLimiterTest { + @Test + public void test() { + SettableLimit limit = SettableLimit.startingAt(10); + DeadlineLimiter limiter = DeadlineLimiter.wrap( + SimpleLimiter.newBuilder().limit(limit).build(), + Instant.now().plusMillis(10)); + + LinkedList listeners = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + limiter.acquire(null).ifPresent(listeners::add); + } + + limit.setLimit(1); + + while (!listeners.isEmpty()) { + listeners.remove().onSuccess(); + } + + limiter.acquire(null); + } + + @Test + public void testMultipleBlockedThreads() throws InterruptedException, ExecutionException, TimeoutException { + int numThreads = 8; + SettableLimit limit = SettableLimit.startingAt(1); + DeadlineLimiter limiter = DeadlineLimiter.wrap( + SimpleLimiter.newBuilder().limit(limit).build(), + Instant.now().plusSeconds(1)); + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + try { + for (Future future : IntStream.range(0, numThreads) + .mapToObj(x -> executorService.submit(() -> limiter.acquire(null).get().onSuccess())) + .collect(Collectors.toList())) { + future.get(1, TimeUnit.SECONDS); + } + } finally { + executorService.shutdown(); + } + } + + @Test + public void testExceedDeadline() { + Instant deadline = Instant.now().plusMillis(50); + SettableLimit limit = SettableLimit.startingAt(1); + DeadlineLimiter limiter = DeadlineLimiter.wrap( + SimpleLimiter.newBuilder().limit(limit).build(), + deadline); + + // Acquire first, will succeed and not block + limiter.acquire(null); + + // Second acquire should time out after the deadline has been reached + Assert.assertFalse(limiter.acquire(null).isPresent()); + Instant after = Instant.now(); + + assertTrue(after.isAfter(deadline)); + } + + @Test(expected=TimeoutException.class) + public void testNoTimeout() throws InterruptedException, ExecutionException, TimeoutException { + SettableLimit limit = SettableLimit.startingAt(1); + DeadlineLimiter limiter = DeadlineLimiter.wrap( + SimpleLimiter.newBuilder().limit(limit).build(), + Instant.now().plusSeconds(2)); + limiter.acquire(null); + + CompletableFuture> future = CompletableFuture.supplyAsync(() -> limiter.acquire(null)); + future.get(1, TimeUnit.SECONDS); + } +}