diff --git a/src/libraries/System.Threading.RateLimiting/src/Properties/InternalsVisibleTo.cs b/src/libraries/System.Threading.RateLimiting/src/Properties/InternalsVisibleTo.cs new file mode 100644 index 00000000000000..78086e4d2350f2 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/Properties/InternalsVisibleTo.cs @@ -0,0 +1,6 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("System.Threading.RateLimiting.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001004b86c4cb78549b34bab61a3b1800e23bfeb5b3ec390074041536a7e3cbd97f5f04cf0f857155a8928eaa29ebfd11cfbbad3ba70efea7bda3226c6a8d370a4cd303f714486b6ebc225985a638471e6ef571cc92a4613c00b8fa65d61ccee0cbe5f36330c9a01f4183559f1bef24cc2917c6d913e3a541333a1d05d9bed22b38cb")] diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index 3ba07631f74bb3..44dac52ee908c4 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -132,6 +132,8 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = default(RequestRegistration.Disposer); + // Perf: Check SemaphoreSlim implementation instead of locking lock (Lock) { @@ -150,18 +152,13 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C do { RequestRegistration oldestRequest = _queue.DequeueHead(); - _queueCount -= oldestRequest.Count; - Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) - { - // Updating queue count is handled by the cancellation code - _queueCount += oldestRequest.Count; - } - else + if (oldestRequest.TrySetResult(FailedLease)) { + // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } - oldestRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(oldestRequest); + Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -173,22 +170,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr); + var request = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(request); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(request.Tcs.Task); + return new ValueTask(request.Task); } } @@ -224,8 +211,15 @@ private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out Rat return false; } +#if DEBUG + // for unit testing + internal event Action? ReleasePreHook; + internal event Action? ReleasePostHook; +#endif + private void Release(int releaseCount) { + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -236,6 +230,10 @@ private void Release(int releaseCount) _permitCount += releaseCount; Debug.Assert(_permitCount <= _options.PermitLimit); +#if DEBUG + ReleasePreHook?.Invoke(); +#endif + while (_queue.Count > 0) { RequestRegistration nextPendingRequest = @@ -245,39 +243,43 @@ private void Release(int releaseCount) // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. // We just need to remove the item and let the next queued item be considered for completion. - if (nextPendingRequest.Tcs.Task.IsCompleted) + if (nextPendingRequest.Task.IsCompleted) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(nextPendingRequest); + continue; } - else if (_permitCount >= nextPendingRequest.Count) + +#if DEBUG + ReleasePostHook?.Invoke(); +#endif + + if (_permitCount >= nextPendingRequest.Count) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); + // Updating queue count is handled by the cancellation/cleanup code _permitCount -= nextPendingRequest.Count; - _queueCount -= nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count); // Check if request was canceled - if (!nextPendingRequest.Tcs.TrySetResult(lease)) + if (!nextPendingRequest.TrySetResult(lease)) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; } else { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -302,6 +304,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -314,9 +317,10 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.CleanupAndAdd(next); + next.TrySetResult(FailedLease); } + Debug.Assert(_queueCount == 0); } } @@ -383,49 +387,78 @@ protected override void Dispose(bool disposing) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int requestedCount, TaskCompletionSource tcs, - CancellationTokenRegistration cancellationTokenRegistration) - { - Count = requestedCount; - // Perf: Use AsyncOperation instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; - } + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; - public int Count { get; } + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; - public TaskCompletionSource Tcs { get; } + public RequestRegistration(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) + { + Count = permitCount; + _cancellationToken = cancellationToken; - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); +#endif + } - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly ConcurrencyLimiter _limiter; - private readonly CancellationToken _cancellationToken; + /// + /// This property is only accessed under limiter lock. + /// + public int Count { get; private set; } - public CancelQueueState(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + registration.Cleanup(); + } + } + + private void Cleanup() + { + var limiter = (ConcurrencyLimiter)Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= Count; + Count = 0; + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void CleanupAndAdd(RequestRegistration request) + { + request.Cleanup(); + request._next = _next; + _next = request; + } + + public void Dispose() { - lock (_limiter.Lock) + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs index 037fc70dc3ab85..71c55d8527a6ae 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -151,6 +151,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) @@ -168,17 +169,13 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C do { RequestRegistration oldestRequest = _queue.DequeueHead(); - _queueCount -= oldestRequest.Count; - Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) - { - _queueCount += oldestRequest.Count; - } - else + if (oldestRequest.TrySetResult(FailedLease)) { + // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } - oldestRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(oldestRequest); + Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -190,22 +187,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr); + var registration = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -280,6 +267,8 @@ private static void Replenish(object? state) // Used in tests that test behavior with specific time intervals private void ReplenishInternal(long nowTicks) { + using var disposer = default(RequestRegistration.Disposer); + // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -315,13 +304,13 @@ private void ReplenishInternal(long nowTicks) // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. // We just need to remove the item and let the next queued item be considered for completion. - if (nextPendingRequest.Tcs.Task.IsCompleted) + if (nextPendingRequest.Task.IsCompleted) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(nextPendingRequest); } else if (_permitCount >= nextPendingRequest.Count) { @@ -331,22 +320,20 @@ private void ReplenishInternal(long nowTicks) ? _queue.DequeueHead() : _queue.DequeueTail(); - _queueCount -= nextPendingRequest.Count; + // Updating queue count is handled by the cancellation/cleanup code _permitCount -= nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; } else { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -372,6 +359,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -385,9 +373,10 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.CleanupAndAdd(next); + next.TrySetResult(FailedLease); } + Debug.Assert(_queueCount == 0); } } @@ -435,48 +424,78 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int permitCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; + + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; + + public RequestRegistration(int permitCount, FixedWindowRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) { Count = permitCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; - } + _cancellationToken = cancellationToken; - public int Count { get; } + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); +#endif + } - public TaskCompletionSource Tcs { get; } + /// + /// This property is only accessed under limiter lock. + /// + public int Count { get; private set; } - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } - - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly FixedWindowRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; + private static void Cancel(object? state) + { + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + registration.Cleanup(); + } + } - public CancelQueueState(int permitCount, FixedWindowRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private void Cleanup() { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + var limiter = (FixedWindowRateLimiter)Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= Count; + Count = 0; + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void CleanupAndAdd(RequestRegistration request) + { + request.Cleanup(); + request._next = _next; + _next = request; + } + + public void Dispose() { - lock (_limiter.Lock) + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs index c2927e282c48eb..6e6db5bf058db7 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -163,6 +163,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) @@ -180,17 +181,13 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C do { RequestRegistration oldestRequest = _queue.DequeueHead(); - _queueCount -= oldestRequest.Count; - Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) - { - _queueCount += oldestRequest.Count; - } - else + if (oldestRequest.TrySetResult(FailedLease)) { + // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } - oldestRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(oldestRequest); + Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -202,22 +199,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr); + var registration = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -286,6 +273,8 @@ private static void Replenish(object? state) // Used in tests that test behavior with specific time intervals private void ReplenishInternal(long nowTicks) { + using var disposer = default(RequestRegistration.Disposer); + // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -325,13 +314,13 @@ private void ReplenishInternal(long nowTicks) // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. // We just need to remove the item and let the next queued item be considered for completion. - if (nextPendingRequest.Tcs.Task.IsCompleted) + if (nextPendingRequest.Task.IsCompleted) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(nextPendingRequest); } // If we have enough permits after replenishing to serve the queued requests else if (_permitCount >= nextPendingRequest.Count) @@ -342,24 +331,22 @@ private void ReplenishInternal(long nowTicks) ? _queue.DequeueHead() : _queue.DequeueTail(); - _queueCount -= nextPendingRequest.Count; + // Updating queue count is handled by the cancellation/cleanup code _permitCount -= nextPendingRequest.Count; _requestsPerSegment[_currentSegmentIndex] += nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; _requestsPerSegment[_currentSegmentIndex] -= nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; } else { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -385,6 +372,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -398,9 +386,10 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.CleanupAndAdd(next); + next.TrySetResult(FailedLease); } + Debug.Assert(_queueCount == 0); } } @@ -448,48 +437,78 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int permitCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; + + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; + + public RequestRegistration(int permitCount, SlidingWindowRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) { Count = permitCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; - } + _cancellationToken = cancellationToken; - public int Count { get; } + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); +#endif + } - public TaskCompletionSource Tcs { get; } + /// + /// This property is only accessed under limiter lock. + /// + public int Count { get; private set; } - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } - - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly SlidingWindowRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; + private static void Cancel(object? state) + { + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + registration.Cleanup(); + } + } - public CancelQueueState(int permitCount, SlidingWindowRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private void Cleanup() { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + var limiter = (SlidingWindowRateLimiter)Task.AsyncState!; + lock (limiter.Lock) + { + limiter._queueCount -= Count; + Count = 0; + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void CleanupAndAdd(RequestRegistration request) + { + request.Cleanup(); + request._next = _next; + _next = request; + } + + public void Dispose() { - lock (_limiter.Lock) + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index b94937600f9440..3cea7f39988e9f 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -156,6 +156,7 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca return new ValueTask(SuccessfulLease); } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease)) @@ -173,18 +174,13 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca do { RequestRegistration oldestRequest = _queue.DequeueHead(); - _queueCount -= oldestRequest.Count; - Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) - { - // Updating queue count is handled by the cancellation code - _queueCount += oldestRequest.Count; - } - else + if (oldestRequest.TrySetResult(FailedLease)) { + // Updating queue count is handled by the cancellation/cleanup code Interlocked.Increment(ref _failedLeasesCount); } - oldestRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(oldestRequest); + Debug.Assert(_queueCount >= 0); } while (_options.QueueLimit - _queueCount < tokenCount); } @@ -196,22 +192,12 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca } } - CancelQueueState tcs = new CancelQueueState(tokenCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(tokenCount, tcs, ctr); + var registration = new RequestRegistration(tokenCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += tokenCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -288,6 +274,8 @@ private static void Replenish(object? state) // Used in tests to avoid dealing with real time private void ReplenishInternal(long nowTicks) { + using var disposer = default(RequestRegistration.Disposer); + // method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -330,13 +318,13 @@ private void ReplenishInternal(long nowTicks) // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. // We just need to remove the item and let the next queued item be considered for completion. - if (nextPendingRequest.Tcs.Task.IsCompleted) + if (nextPendingRequest.Task.IsCompleted) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? queue.DequeueHead() : queue.DequeueTail(); - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(nextPendingRequest); } else if (_tokenCount >= nextPendingRequest.Count) { @@ -346,22 +334,20 @@ private void ReplenishInternal(long nowTicks) ? queue.DequeueHead() : queue.DequeueTail(); - _queueCount -= nextPendingRequest.Count; + // Updating queue count is handled by the cancellation/cleanup code _tokenCount -= nextPendingRequest.Count; Debug.Assert(_tokenCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { // Queued item was canceled so add count back _tokenCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; } else { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.CleanupAndAdd(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -387,6 +373,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -400,9 +387,10 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.CleanupAndAdd(next); + next.TrySetResult(FailedLease); } + Debug.Assert(_queueCount == 0); } } @@ -450,48 +438,78 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int tokenCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) - { - Count = tokenCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; - } + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; - public int Count { get; } + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; - public TaskCompletionSource Tcs { get; } + public RequestRegistration(int permitCount, TokenBucketRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) + { + Count = permitCount; + _cancellationToken = cancellationToken; - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); +#endif + } - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _tokenCount; - private readonly TokenBucketRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; + /// + /// This property is only accessed under limiter lock. + /// + public int Count { get; private set; } - public CancelQueueState(int tokenCount, TokenBucketRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _tokenCount = tokenCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + registration.Cleanup(); + } } - public new bool TrySetCanceled() + private void Cleanup() { - if (TrySetCanceled(_cancellationToken)) + var limiter = (TokenBucketRateLimiter)Task.AsyncState!; + lock (limiter.Lock) { - lock (_limiter.Lock) + limiter._queueCount -= Count; + Count = 0; + } + } + + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable + { + private RequestRegistration? _next; + + public void CleanupAndAdd(RequestRegistration request) + { + request.Cleanup(); + request._next = _next; + _next = request; + } + + public void Dispose() + { + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _tokenCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs index cc503a050dbc36..94ae6d70d77661 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs @@ -123,6 +123,41 @@ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() Assert.True(lease.IsAcquired); } +#if DEBUG + [Fact] + public Task DoesNotDeadlockCleaningUpCanceledRequestedLease_Pre() => + DoesNotDeadlockCleaningUpCanceledRequestedLease((limiter, hook) => limiter.ReleasePreHook += hook); + + [Fact] + public Task DoesNotDeadlockCleaningUpCanceledRequestedLease_Post() => + DoesNotDeadlockCleaningUpCanceledRequestedLease((limiter, hook) => limiter.ReleasePostHook += hook); + + private async Task DoesNotDeadlockCleaningUpCanceledRequestedLease(Action attachHook) + { + using var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 1 + }); + var lease = limiter.AttemptAcquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + _ = limiter.AcquireAsync(1, cts.Token); + attachHook(limiter, () => + { + Task.Run(cts.Cancel); + Thread.Sleep(1); + }); + + var task1 = Task.Delay(1000); + var task2 = Task.Run(lease.Dispose); + Assert.Same(task2, await Task.WhenAny(task1, task2)); + await task2; + } +#endif + [Fact] public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst() {